|
|
""" |
|
|
CQRS API endpoints for command and query operations. |
|
|
|
|
|
This module provides RESTful endpoints that use the CQRS pattern |
|
|
for better scalability and separation of concerns. |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, Optional |
|
|
from datetime import datetime |
|
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.api.auth import get_current_user |
|
|
from src.infrastructure.cqrs.commands import ( |
|
|
CommandBus, |
|
|
CreateInvestigationCommand, |
|
|
UpdateInvestigationCommand, |
|
|
CancelInvestigationCommand, |
|
|
ExecuteAgentTaskCommand, |
|
|
SendChatMessageCommand |
|
|
) |
|
|
from src.infrastructure.cqrs.queries import ( |
|
|
QueryBus, |
|
|
GetInvestigationByIdQuery, |
|
|
SearchInvestigationsQuery, |
|
|
GetInvestigationStatsQuery, |
|
|
SearchContractsQuery, |
|
|
GetAgentPerformanceQuery |
|
|
) |
|
|
from src.infrastructure.events.event_bus import get_event_bus |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
router = APIRouter(prefix="/api/v1/cqrs", tags=["CQRS"]) |
|
|
|
|
|
|
|
|
|
|
|
class CreateInvestigationRequest(BaseModel): |
|
|
"""Request to create investigation.""" |
|
|
query: str |
|
|
data_sources: Optional[list[str]] = None |
|
|
priority: str = "medium" |
|
|
|
|
|
|
|
|
class UpdateInvestigationRequest(BaseModel): |
|
|
"""Request to update investigation.""" |
|
|
status: str |
|
|
results: Optional[Dict[str, Any]] = None |
|
|
|
|
|
|
|
|
class SearchInvestigationsRequest(BaseModel): |
|
|
"""Request to search investigations.""" |
|
|
filters: Dict[str, Any] = {} |
|
|
sort_by: str = "created_at" |
|
|
sort_order: str = "desc" |
|
|
limit: int = 20 |
|
|
offset: int = 0 |
|
|
|
|
|
|
|
|
class SearchContractsRequest(BaseModel): |
|
|
"""Request to search contracts.""" |
|
|
search_term: Optional[str] = None |
|
|
orgao: Optional[str] = None |
|
|
min_value: Optional[float] = None |
|
|
max_value: Optional[float] = None |
|
|
year: Optional[int] = None |
|
|
limit: int = 50 |
|
|
offset: int = 0 |
|
|
|
|
|
|
|
|
class ExecuteAgentTaskRequest(BaseModel): |
|
|
"""Request to execute agent task.""" |
|
|
agent_name: str |
|
|
task_type: str |
|
|
payload: Dict[str, Any] |
|
|
timeout: Optional[float] = None |
|
|
|
|
|
|
|
|
|
|
|
command_bus: Optional[CommandBus] = None |
|
|
query_bus: Optional[QueryBus] = None |
|
|
|
|
|
|
|
|
async def get_command_bus() -> CommandBus: |
|
|
"""Get command bus instance.""" |
|
|
global command_bus |
|
|
if command_bus is None: |
|
|
event_bus = await get_event_bus() |
|
|
command_bus = CommandBus(event_bus) |
|
|
return command_bus |
|
|
|
|
|
|
|
|
async def get_query_bus() -> QueryBus: |
|
|
"""Get query bus instance.""" |
|
|
global query_bus |
|
|
if query_bus is None: |
|
|
query_bus = QueryBus() |
|
|
return query_bus |
|
|
|
|
|
|
|
|
|
|
|
@router.post("/investigations", response_model=Dict[str, Any]) |
|
|
async def create_investigation( |
|
|
request: CreateInvestigationRequest, |
|
|
background_tasks: BackgroundTasks, |
|
|
current_user = Depends(get_current_user), |
|
|
cmd_bus: CommandBus = Depends(get_command_bus) |
|
|
): |
|
|
""" |
|
|
Create a new investigation using CQRS command. |
|
|
|
|
|
This endpoint demonstrates the command side of CQRS: |
|
|
- Accepts write operations |
|
|
- Publishes events |
|
|
- Returns minimal response |
|
|
""" |
|
|
command = CreateInvestigationCommand( |
|
|
user_id=current_user["sub"], |
|
|
query=request.query, |
|
|
data_sources=request.data_sources, |
|
|
priority=request.priority |
|
|
) |
|
|
|
|
|
result = await cmd_bus.execute(command) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=400, detail=result.error) |
|
|
|
|
|
return { |
|
|
"investigation_id": result.data["investigation_id"], |
|
|
"command_id": result.command_id, |
|
|
"events_published": result.events_published |
|
|
} |
|
|
|
|
|
|
|
|
@router.put("/investigations/{investigation_id}", response_model=Dict[str, Any]) |
|
|
async def update_investigation( |
|
|
investigation_id: str, |
|
|
request: UpdateInvestigationRequest, |
|
|
current_user = Depends(get_current_user), |
|
|
cmd_bus: CommandBus = Depends(get_command_bus) |
|
|
): |
|
|
"""Update investigation status.""" |
|
|
command = UpdateInvestigationCommand( |
|
|
user_id=current_user["sub"], |
|
|
investigation_id=investigation_id, |
|
|
status=request.status, |
|
|
results=request.results |
|
|
) |
|
|
|
|
|
result = await cmd_bus.execute(command) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=400, detail=result.error) |
|
|
|
|
|
return {"success": True, "command_id": result.command_id} |
|
|
|
|
|
|
|
|
@router.delete("/investigations/{investigation_id}", response_model=Dict[str, Any]) |
|
|
async def cancel_investigation( |
|
|
investigation_id: str, |
|
|
reason: Optional[str] = None, |
|
|
current_user = Depends(get_current_user), |
|
|
cmd_bus: CommandBus = Depends(get_command_bus) |
|
|
): |
|
|
"""Cancel an investigation.""" |
|
|
command = CancelInvestigationCommand( |
|
|
user_id=current_user["sub"], |
|
|
investigation_id=investigation_id, |
|
|
reason=reason |
|
|
) |
|
|
|
|
|
result = await cmd_bus.execute(command) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=400, detail=result.error) |
|
|
|
|
|
return {"success": True, "command_id": result.command_id} |
|
|
|
|
|
|
|
|
@router.post("/agents/execute", response_model=Dict[str, Any]) |
|
|
async def execute_agent_task( |
|
|
request: ExecuteAgentTaskRequest, |
|
|
background_tasks: BackgroundTasks, |
|
|
current_user = Depends(get_current_user), |
|
|
cmd_bus: CommandBus = Depends(get_command_bus) |
|
|
): |
|
|
"""Execute an agent task.""" |
|
|
command = ExecuteAgentTaskCommand( |
|
|
user_id=current_user["sub"], |
|
|
agent_name=request.agent_name, |
|
|
task_type=request.task_type, |
|
|
payload=request.payload, |
|
|
timeout=request.timeout |
|
|
) |
|
|
|
|
|
result = await cmd_bus.execute(command) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=400, detail=result.error) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"command_id": result.command_id, |
|
|
"task_id": result.data.get("task_id") if result.data else None |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@router.get("/investigations/{investigation_id}", response_model=Dict[str, Any]) |
|
|
async def get_investigation( |
|
|
investigation_id: str, |
|
|
include_findings: bool = True, |
|
|
include_anomalies: bool = True, |
|
|
current_user = Depends(get_current_user), |
|
|
q_bus: QueryBus = Depends(get_query_bus) |
|
|
): |
|
|
""" |
|
|
Get investigation by ID using CQRS query. |
|
|
|
|
|
This endpoint demonstrates the query side of CQRS: |
|
|
- Optimized for reads |
|
|
- Uses caching |
|
|
- Returns denormalized data |
|
|
""" |
|
|
query = GetInvestigationByIdQuery( |
|
|
user_id=current_user["sub"], |
|
|
investigation_id=investigation_id, |
|
|
include_findings=include_findings, |
|
|
include_anomalies=include_anomalies |
|
|
) |
|
|
|
|
|
result = await q_bus.execute(query) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=404, detail=result.error) |
|
|
|
|
|
return { |
|
|
"investigation": result.data, |
|
|
"from_cache": result.from_cache, |
|
|
"execution_time_ms": result.execution_time_ms |
|
|
} |
|
|
|
|
|
|
|
|
@router.post("/investigations/search", response_model=Dict[str, Any]) |
|
|
async def search_investigations( |
|
|
request: SearchInvestigationsRequest, |
|
|
current_user = Depends(get_current_user), |
|
|
q_bus: QueryBus = Depends(get_query_bus) |
|
|
): |
|
|
"""Search investigations with filters.""" |
|
|
query = SearchInvestigationsQuery( |
|
|
user_id=current_user["sub"], |
|
|
filters=request.filters, |
|
|
sort_by=request.sort_by, |
|
|
sort_order=request.sort_order, |
|
|
limit=request.limit, |
|
|
offset=request.offset |
|
|
) |
|
|
|
|
|
result = await q_bus.execute(query) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=400, detail=result.error) |
|
|
|
|
|
return { |
|
|
"investigations": result.data, |
|
|
"metadata": result.metadata, |
|
|
"execution_time_ms": result.execution_time_ms |
|
|
} |
|
|
|
|
|
|
|
|
@router.get("/investigations/stats", response_model=Dict[str, Any]) |
|
|
async def get_investigation_stats( |
|
|
date_from: Optional[datetime] = None, |
|
|
date_to: Optional[datetime] = None, |
|
|
current_user = Depends(get_current_user), |
|
|
q_bus: QueryBus = Depends(get_query_bus) |
|
|
): |
|
|
"""Get investigation statistics.""" |
|
|
query = GetInvestigationStatsQuery( |
|
|
user_id=current_user["sub"], |
|
|
date_from=date_from, |
|
|
date_to=date_to |
|
|
) |
|
|
|
|
|
result = await q_bus.execute(query) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=400, detail=result.error) |
|
|
|
|
|
return { |
|
|
"stats": result.data, |
|
|
"from_cache": result.from_cache, |
|
|
"execution_time_ms": result.execution_time_ms |
|
|
} |
|
|
|
|
|
|
|
|
@router.post("/contracts/search", response_model=Dict[str, Any]) |
|
|
async def search_contracts( |
|
|
request: SearchContractsRequest, |
|
|
current_user = Depends(get_current_user), |
|
|
q_bus: QueryBus = Depends(get_query_bus) |
|
|
): |
|
|
"""Search contracts with filters.""" |
|
|
query = SearchContractsQuery( |
|
|
user_id=current_user["sub"], |
|
|
search_term=request.search_term, |
|
|
orgao=request.orgao, |
|
|
min_value=request.min_value, |
|
|
max_value=request.max_value, |
|
|
year=request.year, |
|
|
limit=request.limit, |
|
|
offset=request.offset, |
|
|
use_cache=True, |
|
|
cache_ttl=300 |
|
|
) |
|
|
|
|
|
result = await q_bus.execute(query) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=400, detail=result.error) |
|
|
|
|
|
return { |
|
|
"contracts": result.data, |
|
|
"from_cache": result.from_cache, |
|
|
"execution_time_ms": result.execution_time_ms |
|
|
} |
|
|
|
|
|
|
|
|
@router.get("/agents/performance", response_model=Dict[str, Any]) |
|
|
async def get_agent_performance( |
|
|
agent_name: Optional[str] = None, |
|
|
time_period: str = "1h", |
|
|
current_user = Depends(get_current_user), |
|
|
q_bus: QueryBus = Depends(get_query_bus) |
|
|
): |
|
|
"""Get agent performance metrics.""" |
|
|
query = GetAgentPerformanceQuery( |
|
|
user_id=current_user["sub"], |
|
|
agent_name=agent_name, |
|
|
time_period=time_period, |
|
|
use_cache=True, |
|
|
cache_ttl=60 |
|
|
) |
|
|
|
|
|
result = await q_bus.execute(query) |
|
|
|
|
|
if not result.success: |
|
|
raise HTTPException(status_code=400, detail=result.error) |
|
|
|
|
|
return { |
|
|
"performance": result.data, |
|
|
"from_cache": result.from_cache, |
|
|
"execution_time_ms": result.execution_time_ms |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@router.get("/stats/commands", response_model=Dict[str, Any]) |
|
|
async def get_command_bus_stats( |
|
|
current_user = Depends(get_current_user), |
|
|
cmd_bus: CommandBus = Depends(get_command_bus) |
|
|
): |
|
|
"""Get command bus statistics.""" |
|
|
return cmd_bus.get_stats() |
|
|
|
|
|
|
|
|
@router.get("/stats/queries", response_model=Dict[str, Any]) |
|
|
async def get_query_bus_stats( |
|
|
current_user = Depends(get_current_user), |
|
|
q_bus: QueryBus = Depends(get_query_bus) |
|
|
): |
|
|
"""Get query bus statistics.""" |
|
|
return q_bus.get_stats() |
|
|
|
|
|
|
|
|
|
|
|
@router.get("/health", response_model=Dict[str, Any]) |
|
|
async def cqrs_health_check(): |
|
|
"""Check CQRS system health.""" |
|
|
try: |
|
|
cmd_bus = await get_command_bus() |
|
|
q_bus = await get_query_bus() |
|
|
event_bus = await get_event_bus() |
|
|
|
|
|
return { |
|
|
"status": "healthy", |
|
|
"command_bus": "ready", |
|
|
"query_bus": "ready", |
|
|
"event_bus": "ready", |
|
|
"event_bus_stats": event_bus.get_stats() |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"status": "unhealthy", |
|
|
"error": str(e) |
|
|
} |