""" Module: api.app Description: FastAPI application for Cidadão.AI transparency platform Author: Anderson H. Silva Date: 2025-01-24 License: Proprietary - All rights reserved """ import asyncio from contextlib import asynccontextmanager from typing import Dict, Any from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Request from fastapi.middleware.cors import CORSMiddleware # from fastapi.middleware.trustedhost import TrustedHostMiddleware # Disabled for HuggingFace from fastapi.responses import JSONResponse # Swagger UI imports removed - using FastAPI defaults now from fastapi.openapi.utils import get_openapi from src.core import get_logger, settings from src.core.exceptions import CidadaoAIError, create_error_response from src.core.audit import audit_logger, AuditEventType, AuditSeverity, AuditContext from src.api.routes import investigations, analysis, reports, health, auth, oauth, audit, chat, websocket_chat, batch, graphql, cqrs, resilience, observability, chat_simple, chat_stable, chat_optimized, chat_emergency, notifications, agents, orchestration, agent_metrics, visualization, geographic from src.api.v1 import dados_gov from src.api.middleware.rate_limiting import RateLimitMiddleware from src.api.middleware.authentication import AuthenticationMiddleware from src.api.middleware.logging_middleware import LoggingMiddleware from src.api.middleware.security import SecurityMiddleware from src.api.middleware.compression import CompressionMiddleware from src.api.middleware.metrics_middleware import MetricsMiddleware, setup_http_metrics from src.api.middleware.ip_whitelist import IPWhitelistMiddleware from src.api.middleware.rate_limit import RateLimitMiddleware as RateLimitMiddlewareV2 from src.infrastructure.observability import ( CorrelationMiddleware, tracing_manager, initialize_app_info ) logger = get_logger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager with enhanced audit logging.""" # Startup logger.info("cidadao_ai_api_starting") # Log startup event await audit_logger.log_event( event_type=AuditEventType.SYSTEM_STARTUP, message=f"Cidadão.AI API started (env: {settings.app_env})", severity=AuditSeverity.LOW, details={ "version": "1.0.0", "environment": settings.app_env, "debug": settings.debug, "security_enabled": True } ) # Initialize observability tracing_manager.initialize() initialize_app_info( version="1.0.0", environment=settings.app_env, build_info={"deployment": "huggingface"} ) # Setup HTTP metrics setup_http_metrics() # Initialize connection pools from src.db.session import init_database await init_database() # Initialize cache warming scheduler from src.services.cache_warming_service import cache_warming_service warming_task = asyncio.create_task(cache_warming_service.start_warming_scheduler()) # Initialize memory system from src.services.memory_startup import setup_memory_on_startup, periodic_memory_optimization memory_agent = await setup_memory_on_startup() # Start periodic memory optimization if enabled memory_task = None if getattr(settings, "ENABLE_MEMORY_OPTIMIZATION", True): memory_task = asyncio.create_task(periodic_memory_optimization()) yield # Shutdown logger.info("cidadao_ai_api_shutting_down") # Stop cache warming warming_task.cancel() try: await warming_task except asyncio.CancelledError: pass # Stop memory optimization if memory_task: memory_task.cancel() try: await memory_task except asyncio.CancelledError: pass # Cleanup memory system from src.services.memory_startup import cleanup_memory_on_shutdown await cleanup_memory_on_shutdown() # Log shutdown event await audit_logger.log_event( event_type=AuditEventType.SYSTEM_SHUTDOWN, message="Cidadão.AI API shutting down", severity=AuditSeverity.LOW ) # Shutdown observability tracing_manager.shutdown() # Close database connections from src.db.session import close_database await close_database() # Create FastAPI application app = FastAPI( title="Cidadão.AI API", description=""" **Plataforma de Transparência Pública com IA** API para investigação inteligente de dados públicos brasileiros. ## Funcionalidades * **Investigação** - Detecção de anomalias e irregularidades * **Análise** - Padrões e correlações em dados públicos * **Relatórios** - Geração de relatórios em linguagem natural * **Transparência** - Acesso democrático a informações governamentais ## Agentes Especializados * **InvestigatorAgent** - Detecção de anomalias com IA explicável * **AnalystAgent** - Análise de padrões e correlações * **ReporterAgent** - Geração de relatórios inteligentes ## Fontes de Dados * Portal da Transparência do Governo Federal * Contratos, despesas, licitações e convênios públicos * Dados de servidores e empresas sancionadas """, version="1.0.0", contact={ "name": "Cidadão.AI", "url": "https://github.com/anderson-ufrj/cidadao.ai", "email": "contato@cidadao.ai", }, license_info={ "name": "Proprietary", "url": "https://github.com/anderson-ufrj/cidadao.ai/blob/main/LICENSE", }, lifespan=lifespan, docs_url="/docs", # Use default FastAPI docs redoc_url="/redoc", # Use default FastAPI redoc openapi_url="/openapi.json", # Explicit OpenAPI URL root_path="" # Important for proxy environments ) # Add security middleware (order matters!) app.add_middleware(SecurityMiddleware) app.add_middleware(LoggingMiddleware) app.add_middleware(RateLimitMiddleware) # Add compression middleware for mobile optimization app.add_middleware( CompressionMiddleware, minimum_size=1024 # Compress responses > 1KB ) # Add trusted host middleware for production # DISABLED for HuggingFace Spaces - causes issues with proxy headers # if settings.app_env == "production": # app.add_middleware( # TrustedHostMiddleware, # allowed_hosts=["api.cidadao.ai", "*.cidadao.ai", "*.hf.space", "*.huggingface.co"] # ) # else: # app.add_middleware( # TrustedHostMiddleware, # allowed_hosts=["localhost", "127.0.0.1", "*.cidadao.ai", "testserver"] # ) # Enhanced CORS middleware for Vercel integration from src.api.middleware.cors_enhanced import setup_cors setup_cors(app) # Add observability middleware app.add_middleware(CorrelationMiddleware, generate_request_id=True) # Add metrics middleware for automatic HTTP metrics app.add_middleware(MetricsMiddleware) # Add compression middleware from src.api.middleware.compression import add_compression_middleware add_compression_middleware( app, minimum_size=settings.compression_min_size, gzip_level=settings.compression_gzip_level, brotli_quality=settings.compression_brotli_quality, exclude_paths={"/health", "/metrics", "/health/metrics", "/api/v1/ws", "/api/v1/observability"} ) # Add streaming compression middleware from src.api.middleware.streaming_compression import StreamingCompressionMiddleware app.add_middleware( StreamingCompressionMiddleware, minimum_size=256, compression_level=settings.compression_gzip_level, chunk_size=8192 ) # Add IP whitelist middleware (only in production) if settings.is_production or settings.app_env == "staging": app.add_middleware( IPWhitelistMiddleware, enabled=True, excluded_paths=[ "/health", "/healthz", "/ping", "/ready", "/docs", "/redoc", "/openapi.json", "/metrics", "/static", "/favicon.ico", "/_next", "/api/v1/auth/login", "/api/v1/auth/register", "/api/v1/auth/refresh", "/api/v1/public", "/api/v1/webhooks/incoming" ], strict_mode=False # Allow requests if IP can't be determined ) # Add rate limiting middleware v2 app.add_middleware( RateLimitMiddlewareV2, default_tier="free", strategy="sliding_window" ) # Add query tracking middleware for cache optimization from src.api.middleware.query_tracking import QueryTrackingMiddleware app.add_middleware( QueryTrackingMiddleware, tracked_paths=[ "/api/v1/investigations", "/api/v1/contracts", "/api/v1/analysis", "/api/v1/reports" ], sample_rate=0.1 if settings.is_production else 1.0 # 10% sampling in production ) # Custom OpenAPI schema def custom_openapi(): """Generate custom OpenAPI schema.""" if app.openapi_schema: return app.openapi_schema openapi_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, ) # Add custom API info openapi_schema["info"]["x-logo"] = { "url": "https://cidadao.ai/logo.png" } # Add servers - include HuggingFace Spaces openapi_schema["servers"] = [ {"url": "http://localhost:8000", "description": "Development server"}, {"url": "https://api.cidadao.ai", "description": "Production server"}, {"url": "https://neural-thinker-cidadao-ai-backend.hf.space", "description": "HuggingFace Spaces"}, ] # Add security schemes openapi_schema["components"]["securitySchemes"] = { "ApiKeyAuth": { "type": "apiKey", "in": "header", "name": "X-API-Key" }, "BearerAuth": { "type": "http", "scheme": "bearer", "bearerFormat": "JWT" } } app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi # Documentation endpoints are now handled by FastAPI defaults # Include routers with security app.include_router( health.router, prefix="/health", tags=["Health Check"] ) app.include_router( auth.router, prefix="/auth", tags=["Authentication"] ) app.include_router( oauth.router, prefix="/auth/oauth", tags=["OAuth2"] ) app.include_router( audit.router, prefix="/audit", tags=["Audit & Security"] ) app.include_router( investigations.router, prefix="/api/v1/investigations", tags=["Investigations"] ) app.include_router( analysis.router, prefix="/api/v1/analysis", tags=["Analysis"] ) app.include_router( reports.router, prefix="/api/v1/reports", tags=["Reports"] ) from src.api.routes import export app.include_router( export.router, prefix="/api/v1/export", tags=["Export"] ) app.include_router( chat.router, prefix="/api/v1/chat", tags=["Chat"] ) app.include_router( chat_simple.router, prefix="/api/v1/chat", tags=["Chat Simple"] ) app.include_router( chat_stable.router, tags=["Chat Stable"] ) app.include_router( chat_optimized.router, tags=["Chat Optimized"] ) app.include_router( chat_emergency.router, tags=["Chat Emergency"] ) app.include_router( websocket_chat.router, prefix="/api/v1", tags=["WebSocket"] ) app.include_router( batch.router, tags=["Batch Operations"] ) # GraphQL endpoint app.include_router( graphql.router, tags=["GraphQL"] ) # CQRS endpoints app.include_router( cqrs.router, tags=["CQRS"] ) # Resilience monitoring endpoints app.include_router( resilience.router, tags=["Resilience"] ) # Observability monitoring endpoints app.include_router( observability.router, tags=["Observability"] ) app.include_router( notifications.router, tags=["Notifications"] ) # Import and include admin routes from src.api.routes.admin import ip_whitelist as admin_ip_whitelist from src.api.routes.admin import cache_warming as admin_cache_warming from src.api.routes.admin import database_optimization as admin_db_optimization from src.api.routes.admin import compression as admin_compression from src.api.routes.admin import connection_pools as admin_conn_pools from src.api.routes.admin import agent_lazy_loading as admin_lazy_loading from src.api.routes import api_keys app.include_router( admin_ip_whitelist.router, prefix="/api/v1/admin", tags=["Admin - IP Whitelist"] ) app.include_router( admin_cache_warming.router, prefix="/api/v1/admin", tags=["Admin - Cache Warming"] ) app.include_router( admin_db_optimization.router, prefix="/api/v1/admin", tags=["Admin - Database Optimization"] ) app.include_router( admin_compression.router, prefix="/api/v1/admin", tags=["Admin - Compression"] ) app.include_router( admin_conn_pools.router, prefix="/api/v1/admin", tags=["Admin - Connection Pools"] ) app.include_router( admin_lazy_loading.router, prefix="/api/v1/admin", tags=["Admin - Agent Lazy Loading"] ) app.include_router( api_keys.router, prefix="/api/v1", tags=["API Keys"] ) app.include_router( dados_gov.router, prefix="/api/v1", tags=["Dados.gov.br"] ) app.include_router( agents.router, prefix="/api/v1/agents", tags=["AI Agents"] ) app.include_router( orchestration.router, prefix="/api/v1/orchestration", tags=["Agent Orchestration"] ) app.include_router( agent_metrics.router, prefix="/api/v1/metrics", tags=["Agent Metrics"] ) app.include_router( visualization.router, tags=["Data Visualization"] ) app.include_router( geographic.router, tags=["Geographic Data"] ) from src.api.routes import ml_pipeline app.include_router( ml_pipeline.router, tags=["ML Pipeline"] ) # Global exception handler @app.exception_handler(CidadaoAIError) async def cidadao_ai_exception_handler(request, exc: CidadaoAIError): """Handle CidadãoAI custom exceptions.""" logger.error( "api_exception_occurred", error_type=type(exc).__name__, error_message=exc.message, error_details=exc.details, path=request.url.path, method=request.method, ) # Map exception types to HTTP status codes status_code_map = { "ValidationError": 400, "DataNotFoundError": 404, "AuthenticationError": 401, "UnauthorizedError": 403, "RateLimitError": 429, "LLMError": 503, "TransparencyAPIError": 502, "AgentExecutionError": 500, } status_code = status_code_map.get(exc.error_code, 500) error_response = create_error_response(exc, status_code) return JSONResponse( status_code=status_code, content=error_response ) @app.exception_handler(HTTPException) async def http_exception_handler(request, exc: HTTPException): """Enhanced HTTP exception handler with audit logging.""" # Create audit context context = AuditContext( ip_address=request.client.host if request.client else "unknown", user_agent=request.headers.get("user-agent"), host=request.headers.get("host") ) # Log security-related errors if exc.status_code in [401, 403, 429]: await audit_logger.log_event( event_type=AuditEventType.UNAUTHORIZED_ACCESS, message=f"HTTP {exc.status_code}: {exc.detail}", severity=AuditSeverity.MEDIUM if exc.status_code != 429 else AuditSeverity.HIGH, success=False, error_code=str(exc.status_code), error_message=exc.detail, context=context ) logger.warning( "http_exception_occurred", status_code=exc.status_code, detail=exc.detail, path=request.url.path, method=request.method, ) return JSONResponse( status_code=exc.status_code, content={ "status": "error", "status_code": exc.status_code, "error": { "error": "HTTPException", "message": exc.detail, "details": {} } } ) @app.exception_handler(Exception) async def general_exception_handler(request, exc: Exception): """Enhanced general exception handler with audit logging.""" # Log unexpected errors with audit context = AuditContext( ip_address=request.client.host if request.client else "unknown", user_agent=request.headers.get("user-agent"), host=request.headers.get("host") ) await audit_logger.log_event( event_type=AuditEventType.API_ERROR, message=f"Unhandled exception: {str(exc)}", severity=AuditSeverity.HIGH, success=False, error_message=str(exc), details={"error_type": type(exc).__name__}, context=context ) logger.error( "unexpected_exception_occurred", error_type=type(exc).__name__, error_message=str(exc), path=request.url.path, method=request.method, ) # Don't expose internal errors in production if settings.app_env == "production": return JSONResponse( status_code=500, content={ "status": "error", "status_code": 500, "error": { "error": "InternalServerError", "message": "An unexpected error occurred", "details": {} } } ) else: return JSONResponse( status_code=500, content={ "status": "error", "status_code": 500, "error": { "error": "InternalServerError", "message": f"An unexpected error occurred: {str(exc)}", "details": {"error_type": type(exc).__name__} } } ) # Root endpoint @app.get("/", include_in_schema=False) async def root(): """Root endpoint with API information.""" return { "message": "Cidadão.AI - Plataforma de Transparência Pública", "version": "1.0.0", "description": "API para investigação inteligente de dados públicos brasileiros", "documentation": "/docs", "health": "/health", "status": "operational", "portal_integration": "active", "last_update": "2025-01-25 15:00:00 UTC" } # Test Portal endpoint @app.get("/test-portal", include_in_schema=False) async def test_portal(): """Test Portal da Transparência integration status.""" import os from src.services.chat_data_integration import chat_data_integration # Test the service is available integration_available = False try: if chat_data_integration: integration_available = True except: pass return { "portal_integration": "enabled", "api_key_configured": bool(os.getenv("TRANSPARENCY_API_KEY")), "integration_service_available": integration_available, "endpoints": { "chat_message": "/api/v1/chat/message", "chat_stream": "/api/v1/chat/stream", "test_portal": "/api/v1/chat/test-portal/{query}", "debug_status": "/api/v1/chat/debug/portal-status" }, "demo_mode": not bool(os.getenv("TRANSPARENCY_API_KEY")), "example_queries": [ "Liste os últimos 3 contratos do ministério da saúde", "Mostre contratos com valor acima de 1 milhão", "Quais empresas têm mais contratos com o governo?" ] } # API info endpoint @app.get("/api/v1/info", tags=["General"]) async def api_info(): """Get API information and capabilities.""" return { "api": { "name": "Cidadão.AI API", "version": "1.0.0", "description": "Plataforma de transparência pública com IA", }, "agents": { "investigator": { "description": "Detecção de anomalias e irregularidades", "capabilities": [ "Anomalias de preço", "Concentração de fornecedores", "Padrões temporais suspeitos", "Contratos duplicados", "Irregularidades de pagamento" ] }, "analyst": { "description": "Análise de padrões e correlações", "capabilities": [ "Tendências de gastos", "Padrões organizacionais", "Comportamento de fornecedores", "Análise sazonal", "Métricas de eficiência" ] }, "reporter": { "description": "Geração de relatórios inteligentes", "capabilities": [ "Relatórios executivos", "Análise detalhada", "Múltiplos formatos", "Linguagem natural" ] } }, "data_sources": [ "Portal da Transparência", "Contratos públicos", "Despesas governamentais", "Licitações", "Convênios", "Servidores públicos" ], "formats": [ "JSON", "Markdown", "HTML", "PDF (planned)" ] } if __name__ == "__main__": import uvicorn uvicorn.run( "src.api.app:app", host=settings.host, port=settings.port, reload=settings.debug, workers=settings.workers if not settings.debug else 1, log_level=settings.log_level.lower(), )