""" CQRS API endpoints for command and query operations. This module provides RESTful endpoints that use the CQRS pattern for better scalability and separation of concerns. """ from typing import Dict, Any, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from pydantic import BaseModel from src.core import get_logger from src.api.auth import get_current_user from src.infrastructure.cqrs.commands import ( CommandBus, CreateInvestigationCommand, UpdateInvestigationCommand, CancelInvestigationCommand, ExecuteAgentTaskCommand, SendChatMessageCommand ) from src.infrastructure.cqrs.queries import ( QueryBus, GetInvestigationByIdQuery, SearchInvestigationsQuery, GetInvestigationStatsQuery, SearchContractsQuery, GetAgentPerformanceQuery ) from src.infrastructure.events.event_bus import get_event_bus logger = get_logger(__name__) router = APIRouter(prefix="/api/v1/cqrs", tags=["CQRS"]) # Request/Response models class CreateInvestigationRequest(BaseModel): """Request to create investigation.""" query: str data_sources: Optional[list[str]] = None priority: str = "medium" class UpdateInvestigationRequest(BaseModel): """Request to update investigation.""" status: str results: Optional[Dict[str, Any]] = None class SearchInvestigationsRequest(BaseModel): """Request to search investigations.""" filters: Dict[str, Any] = {} sort_by: str = "created_at" sort_order: str = "desc" limit: int = 20 offset: int = 0 class SearchContractsRequest(BaseModel): """Request to search contracts.""" search_term: Optional[str] = None orgao: Optional[str] = None min_value: Optional[float] = None max_value: Optional[float] = None year: Optional[int] = None limit: int = 50 offset: int = 0 class ExecuteAgentTaskRequest(BaseModel): """Request to execute agent task.""" agent_name: str task_type: str payload: Dict[str, Any] timeout: Optional[float] = None # Global instances command_bus: Optional[CommandBus] = None query_bus: Optional[QueryBus] = None async def get_command_bus() -> CommandBus: """Get command bus instance.""" global command_bus if command_bus is None: event_bus = await get_event_bus() command_bus = CommandBus(event_bus) return command_bus async def get_query_bus() -> QueryBus: """Get query bus instance.""" global query_bus if query_bus is None: query_bus = QueryBus() return query_bus # Command endpoints @router.post("/investigations", response_model=Dict[str, Any]) async def create_investigation( request: CreateInvestigationRequest, background_tasks: BackgroundTasks, current_user = Depends(get_current_user), cmd_bus: CommandBus = Depends(get_command_bus) ): """ Create a new investigation using CQRS command. This endpoint demonstrates the command side of CQRS: - Accepts write operations - Publishes events - Returns minimal response """ command = CreateInvestigationCommand( user_id=current_user["sub"], query=request.query, data_sources=request.data_sources, priority=request.priority ) result = await cmd_bus.execute(command) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "investigation_id": result.data["investigation_id"], "command_id": result.command_id, "events_published": result.events_published } @router.put("/investigations/{investigation_id}", response_model=Dict[str, Any]) async def update_investigation( investigation_id: str, request: UpdateInvestigationRequest, current_user = Depends(get_current_user), cmd_bus: CommandBus = Depends(get_command_bus) ): """Update investigation status.""" command = UpdateInvestigationCommand( user_id=current_user["sub"], investigation_id=investigation_id, status=request.status, results=request.results ) result = await cmd_bus.execute(command) if not result.success: raise HTTPException(status_code=400, detail=result.error) return {"success": True, "command_id": result.command_id} @router.delete("/investigations/{investigation_id}", response_model=Dict[str, Any]) async def cancel_investigation( investigation_id: str, reason: Optional[str] = None, current_user = Depends(get_current_user), cmd_bus: CommandBus = Depends(get_command_bus) ): """Cancel an investigation.""" command = CancelInvestigationCommand( user_id=current_user["sub"], investigation_id=investigation_id, reason=reason ) result = await cmd_bus.execute(command) if not result.success: raise HTTPException(status_code=400, detail=result.error) return {"success": True, "command_id": result.command_id} @router.post("/agents/execute", response_model=Dict[str, Any]) async def execute_agent_task( request: ExecuteAgentTaskRequest, background_tasks: BackgroundTasks, current_user = Depends(get_current_user), cmd_bus: CommandBus = Depends(get_command_bus) ): """Execute an agent task.""" command = ExecuteAgentTaskCommand( user_id=current_user["sub"], agent_name=request.agent_name, task_type=request.task_type, payload=request.payload, timeout=request.timeout ) result = await cmd_bus.execute(command) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "success": True, "command_id": result.command_id, "task_id": result.data.get("task_id") if result.data else None } # Query endpoints @router.get("/investigations/{investigation_id}", response_model=Dict[str, Any]) async def get_investigation( investigation_id: str, include_findings: bool = True, include_anomalies: bool = True, current_user = Depends(get_current_user), q_bus: QueryBus = Depends(get_query_bus) ): """ Get investigation by ID using CQRS query. This endpoint demonstrates the query side of CQRS: - Optimized for reads - Uses caching - Returns denormalized data """ query = GetInvestigationByIdQuery( user_id=current_user["sub"], investigation_id=investigation_id, include_findings=include_findings, include_anomalies=include_anomalies ) result = await q_bus.execute(query) if not result.success: raise HTTPException(status_code=404, detail=result.error) return { "investigation": result.data, "from_cache": result.from_cache, "execution_time_ms": result.execution_time_ms } @router.post("/investigations/search", response_model=Dict[str, Any]) async def search_investigations( request: SearchInvestigationsRequest, current_user = Depends(get_current_user), q_bus: QueryBus = Depends(get_query_bus) ): """Search investigations with filters.""" query = SearchInvestigationsQuery( user_id=current_user["sub"], filters=request.filters, sort_by=request.sort_by, sort_order=request.sort_order, limit=request.limit, offset=request.offset ) result = await q_bus.execute(query) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "investigations": result.data, "metadata": result.metadata, "execution_time_ms": result.execution_time_ms } @router.get("/investigations/stats", response_model=Dict[str, Any]) async def get_investigation_stats( date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, current_user = Depends(get_current_user), q_bus: QueryBus = Depends(get_query_bus) ): """Get investigation statistics.""" query = GetInvestigationStatsQuery( user_id=current_user["sub"], date_from=date_from, date_to=date_to ) result = await q_bus.execute(query) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "stats": result.data, "from_cache": result.from_cache, "execution_time_ms": result.execution_time_ms } @router.post("/contracts/search", response_model=Dict[str, Any]) async def search_contracts( request: SearchContractsRequest, current_user = Depends(get_current_user), q_bus: QueryBus = Depends(get_query_bus) ): """Search contracts with filters.""" query = SearchContractsQuery( user_id=current_user["sub"], search_term=request.search_term, orgao=request.orgao, min_value=request.min_value, max_value=request.max_value, year=request.year, limit=request.limit, offset=request.offset, use_cache=True, cache_ttl=300 # 5 minutes ) result = await q_bus.execute(query) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "contracts": result.data, "from_cache": result.from_cache, "execution_time_ms": result.execution_time_ms } @router.get("/agents/performance", response_model=Dict[str, Any]) async def get_agent_performance( agent_name: Optional[str] = None, time_period: str = "1h", current_user = Depends(get_current_user), q_bus: QueryBus = Depends(get_query_bus) ): """Get agent performance metrics.""" query = GetAgentPerformanceQuery( user_id=current_user["sub"], agent_name=agent_name, time_period=time_period, use_cache=True, cache_ttl=60 # 1 minute for recent metrics ) result = await q_bus.execute(query) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "performance": result.data, "from_cache": result.from_cache, "execution_time_ms": result.execution_time_ms } # Bus statistics endpoints @router.get("/stats/commands", response_model=Dict[str, Any]) async def get_command_bus_stats( current_user = Depends(get_current_user), cmd_bus: CommandBus = Depends(get_command_bus) ): """Get command bus statistics.""" return cmd_bus.get_stats() @router.get("/stats/queries", response_model=Dict[str, Any]) async def get_query_bus_stats( current_user = Depends(get_current_user), q_bus: QueryBus = Depends(get_query_bus) ): """Get query bus statistics.""" return q_bus.get_stats() # Health check @router.get("/health", response_model=Dict[str, Any]) async def cqrs_health_check(): """Check CQRS system health.""" try: cmd_bus = await get_command_bus() q_bus = await get_query_bus() event_bus = await get_event_bus() return { "status": "healthy", "command_bus": "ready", "query_bus": "ready", "event_bus": "ready", "event_bus_stats": event_bus.get_stats() } except Exception as e: return { "status": "unhealthy", "error": str(e) }