neural-thinker's picture
feat: clean HuggingFace deployment with essential files only
824bf31
raw
history blame
13 kB
"""
Audit routes for Cidadão.AI API
Security audit logging and monitoring endpoints
"""
from datetime import datetime, timedelta
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import io
from src.core.audit import (
audit_logger,
AuditFilter,
AuditEvent,
AuditEventType,
AuditSeverity,
AuditStatistics
)
from src.api.auth import get_current_user, require_admin, User
router = APIRouter(prefix="/audit", tags=["audit"])
class AuditEventResponse(BaseModel):
"""Audit event response model."""
id: str
timestamp: datetime
event_type: str
severity: str
message: str
user_id: Optional[str] = None
user_email: Optional[str] = None
user_role: Optional[str] = None
resource_type: Optional[str] = None
resource_id: Optional[str] = None
resource_name: Optional[str] = None
success: bool
error_code: Optional[str] = None
error_message: Optional[str] = None
details: dict = {}
context: Optional[dict] = None
class AuditQueryRequest(BaseModel):
"""Audit query request model."""
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
event_types: Optional[List[AuditEventType]] = None
severity_levels: Optional[List[AuditSeverity]] = None
user_id: Optional[str] = None
user_email: Optional[str] = None
resource_type: Optional[str] = None
resource_id: Optional[str] = None
success_only: Optional[bool] = None
ip_address: Optional[str] = None
limit: int = 100
offset: int = 0
@router.get("/events", response_model=List[AuditEventResponse])
async def get_audit_events(
start_date: Optional[datetime] = Query(None, description="Start date filter"),
end_date: Optional[datetime] = Query(None, description="End date filter"),
event_type: Optional[AuditEventType] = Query(None, description="Event type filter"),
severity: Optional[AuditSeverity] = Query(None, description="Severity filter"),
user_email: Optional[str] = Query(None, description="User email filter"),
resource_type: Optional[str] = Query(None, description="Resource type filter"),
success_only: Optional[bool] = Query(None, description="Success only filter"),
limit: int = Query(100, le=1000, description="Result limit"),
offset: int = Query(0, ge=0, description="Result offset"),
current_user: User = Depends(get_current_user)
):
"""Get audit events (admin only)."""
require_admin(current_user)
# Build filter
filter_options = AuditFilter(
start_date=start_date,
end_date=end_date,
event_types=[event_type] if event_type else None,
severity_levels=[severity] if severity else None,
user_email=user_email,
resource_type=resource_type,
success_only=success_only,
limit=limit,
offset=offset
)
# Query events
events = await audit_logger.query_events(filter_options)
# Convert to response format
response_events = []
for event in events:
response_events.append(AuditEventResponse(
id=event.id,
timestamp=event.timestamp,
event_type=event.event_type.value,
severity=event.severity.value,
message=event.message,
user_id=event.user_id,
user_email=event.user_email,
user_role=event.user_role,
resource_type=event.resource_type,
resource_id=event.resource_id,
resource_name=event.resource_name,
success=event.success,
error_code=event.error_code,
error_message=event.error_message,
details=event.details,
context=event.context.model_dump() if event.context else None
))
return response_events
@router.post("/events/query", response_model=List[AuditEventResponse])
async def query_audit_events(
query_request: AuditQueryRequest,
current_user: User = Depends(get_current_user)
):
"""Query audit events with advanced filters (admin only)."""
require_admin(current_user)
# Convert to filter options
filter_options = AuditFilter(**query_request.model_dump())
# Query events
events = await audit_logger.query_events(filter_options)
# Convert to response format
response_events = []
for event in events:
response_events.append(AuditEventResponse(
id=event.id,
timestamp=event.timestamp,
event_type=event.event_type.value,
severity=event.severity.value,
message=event.message,
user_id=event.user_id,
user_email=event.user_email,
user_role=event.user_role,
resource_type=event.resource_type,
resource_id=event.resource_id,
resource_name=event.resource_name,
success=event.success,
error_code=event.error_code,
error_message=event.error_message,
details=event.details,
context=event.context.model_dump() if event.context else None
))
return response_events
@router.get("/statistics", response_model=AuditStatistics)
async def get_audit_statistics(
start_date: Optional[datetime] = Query(None, description="Start date filter"),
end_date: Optional[datetime] = Query(None, description="End date filter"),
current_user: User = Depends(get_current_user)
):
"""Get audit statistics (admin only)."""
require_admin(current_user)
statistics = await audit_logger.get_statistics(
start_date=start_date,
end_date=end_date
)
return statistics
@router.get("/export")
async def export_audit_events(
format: str = Query("json", regex="^(json|csv)$", description="Export format"),
start_date: Optional[datetime] = Query(None, description="Start date filter"),
end_date: Optional[datetime] = Query(None, description="End date filter"),
event_type: Optional[AuditEventType] = Query(None, description="Event type filter"),
severity: Optional[AuditSeverity] = Query(None, description="Severity filter"),
user_email: Optional[str] = Query(None, description="User email filter"),
current_user: User = Depends(get_current_user)
):
"""Export audit events (admin only)."""
require_admin(current_user)
# Build filter
filter_options = AuditFilter(
start_date=start_date,
end_date=end_date,
event_types=[event_type] if event_type else None,
severity_levels=[severity] if severity else None,
user_email=user_email,
limit=10000 # Allow larger exports
)
# Export events
exported_data = await audit_logger.export_events(filter_options, format)
# Set appropriate content type and filename
if format == "json":
media_type = "application/json"
filename = f"audit_events_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
else: # csv
media_type = "text/csv"
filename = f"audit_events_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
# Create streaming response
return StreamingResponse(
io.StringIO(exported_data),
media_type=media_type,
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
@router.get("/events/{event_id}", response_model=AuditEventResponse)
async def get_audit_event(
event_id: str,
current_user: User = Depends(get_current_user)
):
"""Get specific audit event (admin only)."""
require_admin(current_user)
# Find event by ID
event = None
for e in audit_logger.events:
if e.id == event_id:
event = e
break
if not event:
raise HTTPException(status_code=404, detail="Audit event not found")
return AuditEventResponse(
id=event.id,
timestamp=event.timestamp,
event_type=event.event_type.value,
severity=event.severity.value,
message=event.message,
user_id=event.user_id,
user_email=event.user_email,
user_role=event.user_role,
resource_type=event.resource_type,
resource_id=event.resource_id,
resource_name=event.resource_name,
success=event.success,
error_code=event.error_code,
error_message=event.error_message,
details=event.details,
context=event.context.model_dump() if event.context else None
)
@router.get("/integrity")
async def verify_audit_integrity(
current_user: User = Depends(get_current_user)
):
"""Verify audit log integrity (admin only)."""
require_admin(current_user)
integrity_report = await audit_logger.verify_integrity()
return integrity_report
@router.get("/event-types")
async def get_audit_event_types(
current_user: User = Depends(get_current_user)
):
"""Get available audit event types (admin only)."""
require_admin(current_user)
event_types = [
{
"value": event_type.value,
"name": event_type.name,
"description": event_type.value.replace(".", " ").replace("_", " ").title()
}
for event_type in AuditEventType
]
return {"event_types": event_types}
@router.get("/severity-levels")
async def get_audit_severity_levels(
current_user: User = Depends(get_current_user)
):
"""Get available audit severity levels (admin only)."""
require_admin(current_user)
severity_levels = [
{
"value": severity.value,
"name": severity.name,
"description": severity.value.title()
}
for severity in AuditSeverity
]
return {"severity_levels": severity_levels}
@router.get("/dashboard")
async def get_audit_dashboard(
current_user: User = Depends(get_current_user)
):
"""Get audit dashboard data (admin only)."""
require_admin(current_user)
# Get recent statistics
now = datetime.utcnow()
last_24h = now - timedelta(hours=24)
last_7d = now - timedelta(days=7)
last_30d = now - timedelta(days=30)
stats_24h = await audit_logger.get_statistics(start_date=last_24h)
stats_7d = await audit_logger.get_statistics(start_date=last_7d)
stats_30d = await audit_logger.get_statistics(start_date=last_30d)
# Get recent high severity events
high_severity_filter = AuditFilter(
severity_levels=[AuditSeverity.HIGH, AuditSeverity.CRITICAL],
start_date=last_24h,
limit=10
)
recent_alerts = await audit_logger.query_events(high_severity_filter)
# Get recent failed events
failed_events_filter = AuditFilter(
success_only=False,
start_date=last_24h,
limit=10
)
recent_failures = await audit_logger.query_events(failed_events_filter)
return {
"statistics": {
"last_24h": stats_24h,
"last_7d": stats_7d,
"last_30d": stats_30d
},
"recent_alerts": [
{
"id": event.id,
"timestamp": event.timestamp,
"event_type": event.event_type.value,
"severity": event.severity.value,
"message": event.message,
"user_email": event.user_email
}
for event in recent_alerts
],
"recent_failures": [
{
"id": event.id,
"timestamp": event.timestamp,
"event_type": event.event_type.value,
"message": event.message,
"error_message": event.error_message,
"user_email": event.user_email
}
for event in recent_failures
]
}
@router.post("/test-event")
async def create_test_audit_event(
current_user: User = Depends(get_current_user)
):
"""Create a test audit event (admin only, for testing purposes)."""
require_admin(current_user)
from src.core.audit import AuditContext
# Create test context
test_context = AuditContext(
ip_address="127.0.0.1",
user_agent="Test Agent",
host="localhost"
)
# Create test event
event = await audit_logger.log_event(
event_type=AuditEventType.ADMIN_ACTION,
message="Test audit event created by administrator",
severity=AuditSeverity.LOW,
user_id=current_user.id,
user_email=current_user.email,
user_role=current_user.role,
resource_type="audit",
resource_id="test",
details={"test": True, "created_by": current_user.email},
context=test_context
)
return {
"message": "Test audit event created successfully",
"event_id": event.id
}