cidadao.ai-backend / src /api /routes /orchestration.py
anderson-ufrj
fix(imports): correct User import in orchestration route
7d69dd0
"""
API routes for agent orchestration.
"""
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from uuid import uuid4
from src.core import get_logger
from src.api.auth import User
from src.api.dependencies import get_current_user
from src.services.agent_orchestrator import (
AgentOrchestrator,
WorkflowDefinition,
WorkflowStep,
OrchestrationPattern,
get_orchestrator
)
from src.agents.deodoro import AgentContext
from src.core.exceptions import OrchestrationError
router = APIRouter()
logger = get_logger("api.orchestration")
class WorkflowStepRequest(BaseModel):
"""Request model for workflow step."""
step_id: str
agent_name: str
action: str
input_mapping: Dict[str, str] = Field(default_factory=dict)
output_mapping: Dict[str, str] = Field(default_factory=dict)
conditions: Dict[str, Any] = Field(default_factory=dict)
retry_config: Dict[str, Any] = Field(default_factory=dict)
timeout: int = 300
class WorkflowRequest(BaseModel):
"""Request model for workflow execution."""
workflow_id: Optional[str] = None
name: str
pattern: str = "sequential"
steps: List[WorkflowStepRequest]
initial_data: Dict[str, Any]
timeout: int = 1800
class ConditionalWorkflowRequest(BaseModel):
"""Request model for conditional workflow."""
workflow_definition: Dict[str, Any]
initial_data: Dict[str, Any]
class CapabilitySearchRequest(BaseModel):
"""Request model for capability search."""
required_capabilities: List[str]
prefer_single_agent: bool = True
@router.post("/workflows/execute")
async def execute_workflow(
request: WorkflowRequest,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
orchestrator: AgentOrchestrator = Depends(get_orchestrator)
):
"""Execute an orchestrated workflow."""
try:
# Create workflow definition
workflow_def = WorkflowDefinition(
workflow_id=request.workflow_id or str(uuid4()),
name=request.name,
pattern=OrchestrationPattern(request.pattern),
steps=[
WorkflowStep(
step_id=step.step_id,
agent_name=step.agent_name,
action=step.action,
input_mapping=step.input_mapping,
output_mapping=step.output_mapping,
conditions=step.conditions,
retry_config=step.retry_config,
timeout=step.timeout
)
for step in request.steps
],
timeout=request.timeout
)
# Register workflow
orchestrator._workflows[workflow_def.workflow_id] = workflow_def
# Create context
context = AgentContext(
investigation_id=str(uuid4()),
user_id=current_user.id,
session_id=str(uuid4()),
metadata={
"workflow_id": workflow_def.workflow_id,
"workflow_name": workflow_def.name,
"pattern": workflow_def.pattern.value
}
)
# Execute workflow
result = await orchestrator.execute_workflow(
workflow_def.workflow_id,
request.initial_data,
context
)
return {
"status": "success",
"workflow_id": workflow_def.workflow_id,
"result": result
}
except OrchestrationError as e:
logger.error(f"Orchestration error: {e}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Unexpected error in workflow execution: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/workflows/conditional")
async def execute_conditional_workflow(
request: ConditionalWorkflowRequest,
current_user: User = Depends(get_current_user),
orchestrator: AgentOrchestrator = Depends(get_orchestrator)
):
"""Execute a conditional workflow with branching."""
try:
# Create context
context = AgentContext(
investigation_id=str(uuid4()),
user_id=current_user.id,
session_id=str(uuid4()),
metadata={
"workflow_type": "conditional",
"workflow_definition": request.workflow_definition
}
)
# Execute conditional workflow
execution_path = await orchestrator.execute_conditional_workflow(
request.workflow_definition,
request.initial_data,
context
)
return {
"status": "success",
"execution_path": execution_path,
"total_steps": len(execution_path)
}
except Exception as e:
logger.error(f"Error in conditional workflow: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/agents/discover")
async def discover_agents(
current_user: User = Depends(get_current_user),
orchestrator: AgentOrchestrator = Depends(get_orchestrator)
):
"""Discover all available agents and their capabilities."""
try:
agents = await orchestrator.discover_agents()
# Enrich with capabilities
enriched_agents = []
for agent in agents:
agent_info = {
"name": agent["name"],
"description": agent.get("description", ""),
"capabilities": orchestrator._agent_capabilities.get(agent["name"], []),
"status": agent.get("status", "available")
}
enriched_agents.append(agent_info)
return {
"status": "success",
"total_agents": len(enriched_agents),
"agents": enriched_agents
}
except Exception as e:
logger.error(f"Error discovering agents: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/agents/find-by-capability")
async def find_agents_by_capability(
request: CapabilitySearchRequest,
current_user: User = Depends(get_current_user),
orchestrator: AgentOrchestrator = Depends(get_orchestrator)
):
"""Find agents with specific capabilities."""
try:
# Find agents for each capability
capability_matches = {}
for capability in request.required_capabilities:
matching_agents = await orchestrator.find_agents_with_capability(capability)
capability_matches[capability] = matching_agents
# Find agents that have all required capabilities
all_agents = set()
for agents in capability_matches.values():
for agent in agents:
all_agents.add(agent["name"])
# Filter agents that have all capabilities
qualified_agents = []
for agent_name in all_agents:
agent_capabilities = orchestrator._agent_capabilities.get(agent_name, [])
if all(cap in agent_capabilities for cap in request.required_capabilities):
qualified_agents.append({
"name": agent_name,
"capabilities": agent_capabilities,
"match_score": len(set(request.required_capabilities) & set(agent_capabilities))
})
# Sort by match score
qualified_agents.sort(key=lambda x: x["match_score"], reverse=True)
# If prefer single agent, return the best match
if request.prefer_single_agent and qualified_agents:
best_agent = await orchestrator.select_best_agent(request.required_capabilities)
return {
"status": "success",
"best_match": {
"name": best_agent.name if best_agent else None,
"capabilities": orchestrator._agent_capabilities.get(best_agent.name, []) if best_agent else []
},
"all_matches": qualified_agents
}
return {
"status": "success",
"matching_agents": qualified_agents,
"total_matches": len(qualified_agents)
}
except Exception as e:
logger.error(f"Error finding agents: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/orchestrator/stats")
async def get_orchestrator_stats(
current_user: User = Depends(get_current_user),
orchestrator: AgentOrchestrator = Depends(get_orchestrator)
):
"""Get orchestrator statistics and performance metrics."""
try:
stats = await orchestrator.get_stats()
return {
"status": "success",
"statistics": stats
}
except Exception as e:
logger.error(f"Error getting orchestrator stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/workflows/patterns/{pattern}")
async def execute_pattern_workflow(
pattern: str,
data: Dict[str, Any],
current_user: User = Depends(get_current_user),
orchestrator: AgentOrchestrator = Depends(get_orchestrator)
):
"""Execute a specific orchestration pattern."""
try:
# Validate pattern
try:
pattern_enum = OrchestrationPattern(pattern)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid pattern: {pattern}")
# Create a simple workflow for the pattern
if pattern == "map_reduce":
steps = [
WorkflowStep(
step_id="map",
agent_name="zumbi",
action="analyze"
),
WorkflowStep(
step_id="reduce",
agent_name="anita",
action="aggregate"
)
]
elif pattern == "fan_out_fan_in":
steps = [
WorkflowStep(
step_id="analyze1",
agent_name="zumbi",
action="analyze"
),
WorkflowStep(
step_id="analyze2",
agent_name="maria_quiteria",
action="security_audit"
),
WorkflowStep(
step_id="analyze3",
agent_name="bonifacio",
action="policy_analysis"
)
]
else:
steps = [
WorkflowStep(
step_id="step1",
agent_name="zumbi",
action="analyze"
)
]
workflow = WorkflowDefinition(
workflow_id=f"{pattern}_{uuid4()}",
name=f"{pattern} workflow",
pattern=pattern_enum,
steps=steps
)
orchestrator._workflows[workflow.workflow_id] = workflow
# Create context
context = AgentContext(
investigation_id=str(uuid4()),
user_id=current_user.id,
session_id=str(uuid4()),
metadata={"pattern": pattern}
)
# Execute
result = await orchestrator.execute_workflow(
workflow.workflow_id,
data,
context
)
return {
"status": "success",
"pattern": pattern,
"result": result
}
except Exception as e:
logger.error(f"Error executing pattern workflow: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/workflows")
async def list_workflows(
current_user: User = Depends(get_current_user),
orchestrator: AgentOrchestrator = Depends(get_orchestrator)
):
"""List all registered workflows."""
try:
workflows = []
for workflow_id, workflow in orchestrator._workflows.items():
workflows.append({
"workflow_id": workflow_id,
"name": workflow.name,
"pattern": workflow.pattern.value,
"steps": len(workflow.steps),
"timeout": workflow.timeout
})
return {
"status": "success",
"total_workflows": len(workflows),
"workflows": workflows
}
except Exception as e:
logger.error(f"Error listing workflows: {e}")
raise HTTPException(status_code=500, detail=str(e))