|
|
""" |
|
|
API routes for agent orchestration. |
|
|
""" |
|
|
|
|
|
from typing import Any, Dict, List, Optional |
|
|
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks |
|
|
from pydantic import BaseModel, Field |
|
|
from uuid import uuid4 |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.api.auth import User |
|
|
from src.api.dependencies import get_current_user |
|
|
from src.services.agent_orchestrator import ( |
|
|
AgentOrchestrator, |
|
|
WorkflowDefinition, |
|
|
WorkflowStep, |
|
|
OrchestrationPattern, |
|
|
get_orchestrator |
|
|
) |
|
|
from src.agents.deodoro import AgentContext |
|
|
from src.core.exceptions import OrchestrationError |
|
|
|
|
|
|
|
|
router = APIRouter() |
|
|
logger = get_logger("api.orchestration") |
|
|
|
|
|
|
|
|
class WorkflowStepRequest(BaseModel): |
|
|
"""Request model for workflow step.""" |
|
|
step_id: str |
|
|
agent_name: str |
|
|
action: str |
|
|
input_mapping: Dict[str, str] = Field(default_factory=dict) |
|
|
output_mapping: Dict[str, str] = Field(default_factory=dict) |
|
|
conditions: Dict[str, Any] = Field(default_factory=dict) |
|
|
retry_config: Dict[str, Any] = Field(default_factory=dict) |
|
|
timeout: int = 300 |
|
|
|
|
|
|
|
|
class WorkflowRequest(BaseModel): |
|
|
"""Request model for workflow execution.""" |
|
|
workflow_id: Optional[str] = None |
|
|
name: str |
|
|
pattern: str = "sequential" |
|
|
steps: List[WorkflowStepRequest] |
|
|
initial_data: Dict[str, Any] |
|
|
timeout: int = 1800 |
|
|
|
|
|
|
|
|
class ConditionalWorkflowRequest(BaseModel): |
|
|
"""Request model for conditional workflow.""" |
|
|
workflow_definition: Dict[str, Any] |
|
|
initial_data: Dict[str, Any] |
|
|
|
|
|
|
|
|
class CapabilitySearchRequest(BaseModel): |
|
|
"""Request model for capability search.""" |
|
|
required_capabilities: List[str] |
|
|
prefer_single_agent: bool = True |
|
|
|
|
|
|
|
|
@router.post("/workflows/execute") |
|
|
async def execute_workflow( |
|
|
request: WorkflowRequest, |
|
|
background_tasks: BackgroundTasks, |
|
|
current_user: User = Depends(get_current_user), |
|
|
orchestrator: AgentOrchestrator = Depends(get_orchestrator) |
|
|
): |
|
|
"""Execute an orchestrated workflow.""" |
|
|
try: |
|
|
|
|
|
workflow_def = WorkflowDefinition( |
|
|
workflow_id=request.workflow_id or str(uuid4()), |
|
|
name=request.name, |
|
|
pattern=OrchestrationPattern(request.pattern), |
|
|
steps=[ |
|
|
WorkflowStep( |
|
|
step_id=step.step_id, |
|
|
agent_name=step.agent_name, |
|
|
action=step.action, |
|
|
input_mapping=step.input_mapping, |
|
|
output_mapping=step.output_mapping, |
|
|
conditions=step.conditions, |
|
|
retry_config=step.retry_config, |
|
|
timeout=step.timeout |
|
|
) |
|
|
for step in request.steps |
|
|
], |
|
|
timeout=request.timeout |
|
|
) |
|
|
|
|
|
|
|
|
orchestrator._workflows[workflow_def.workflow_id] = workflow_def |
|
|
|
|
|
|
|
|
context = AgentContext( |
|
|
investigation_id=str(uuid4()), |
|
|
user_id=current_user.id, |
|
|
session_id=str(uuid4()), |
|
|
metadata={ |
|
|
"workflow_id": workflow_def.workflow_id, |
|
|
"workflow_name": workflow_def.name, |
|
|
"pattern": workflow_def.pattern.value |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
result = await orchestrator.execute_workflow( |
|
|
workflow_def.workflow_id, |
|
|
request.initial_data, |
|
|
context |
|
|
) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"workflow_id": workflow_def.workflow_id, |
|
|
"result": result |
|
|
} |
|
|
|
|
|
except OrchestrationError as e: |
|
|
logger.error(f"Orchestration error: {e}") |
|
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error in workflow execution: {e}") |
|
|
raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
|
|
|
|
@router.post("/workflows/conditional") |
|
|
async def execute_conditional_workflow( |
|
|
request: ConditionalWorkflowRequest, |
|
|
current_user: User = Depends(get_current_user), |
|
|
orchestrator: AgentOrchestrator = Depends(get_orchestrator) |
|
|
): |
|
|
"""Execute a conditional workflow with branching.""" |
|
|
try: |
|
|
|
|
|
context = AgentContext( |
|
|
investigation_id=str(uuid4()), |
|
|
user_id=current_user.id, |
|
|
session_id=str(uuid4()), |
|
|
metadata={ |
|
|
"workflow_type": "conditional", |
|
|
"workflow_definition": request.workflow_definition |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
execution_path = await orchestrator.execute_conditional_workflow( |
|
|
request.workflow_definition, |
|
|
request.initial_data, |
|
|
context |
|
|
) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"execution_path": execution_path, |
|
|
"total_steps": len(execution_path) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in conditional workflow: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@router.get("/agents/discover") |
|
|
async def discover_agents( |
|
|
current_user: User = Depends(get_current_user), |
|
|
orchestrator: AgentOrchestrator = Depends(get_orchestrator) |
|
|
): |
|
|
"""Discover all available agents and their capabilities.""" |
|
|
try: |
|
|
agents = await orchestrator.discover_agents() |
|
|
|
|
|
|
|
|
enriched_agents = [] |
|
|
for agent in agents: |
|
|
agent_info = { |
|
|
"name": agent["name"], |
|
|
"description": agent.get("description", ""), |
|
|
"capabilities": orchestrator._agent_capabilities.get(agent["name"], []), |
|
|
"status": agent.get("status", "available") |
|
|
} |
|
|
enriched_agents.append(agent_info) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"total_agents": len(enriched_agents), |
|
|
"agents": enriched_agents |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error discovering agents: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@router.post("/agents/find-by-capability") |
|
|
async def find_agents_by_capability( |
|
|
request: CapabilitySearchRequest, |
|
|
current_user: User = Depends(get_current_user), |
|
|
orchestrator: AgentOrchestrator = Depends(get_orchestrator) |
|
|
): |
|
|
"""Find agents with specific capabilities.""" |
|
|
try: |
|
|
|
|
|
capability_matches = {} |
|
|
for capability in request.required_capabilities: |
|
|
matching_agents = await orchestrator.find_agents_with_capability(capability) |
|
|
capability_matches[capability] = matching_agents |
|
|
|
|
|
|
|
|
all_agents = set() |
|
|
for agents in capability_matches.values(): |
|
|
for agent in agents: |
|
|
all_agents.add(agent["name"]) |
|
|
|
|
|
|
|
|
qualified_agents = [] |
|
|
for agent_name in all_agents: |
|
|
agent_capabilities = orchestrator._agent_capabilities.get(agent_name, []) |
|
|
if all(cap in agent_capabilities for cap in request.required_capabilities): |
|
|
qualified_agents.append({ |
|
|
"name": agent_name, |
|
|
"capabilities": agent_capabilities, |
|
|
"match_score": len(set(request.required_capabilities) & set(agent_capabilities)) |
|
|
}) |
|
|
|
|
|
|
|
|
qualified_agents.sort(key=lambda x: x["match_score"], reverse=True) |
|
|
|
|
|
|
|
|
if request.prefer_single_agent and qualified_agents: |
|
|
best_agent = await orchestrator.select_best_agent(request.required_capabilities) |
|
|
return { |
|
|
"status": "success", |
|
|
"best_match": { |
|
|
"name": best_agent.name if best_agent else None, |
|
|
"capabilities": orchestrator._agent_capabilities.get(best_agent.name, []) if best_agent else [] |
|
|
}, |
|
|
"all_matches": qualified_agents |
|
|
} |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"matching_agents": qualified_agents, |
|
|
"total_matches": len(qualified_agents) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error finding agents: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@router.get("/orchestrator/stats") |
|
|
async def get_orchestrator_stats( |
|
|
current_user: User = Depends(get_current_user), |
|
|
orchestrator: AgentOrchestrator = Depends(get_orchestrator) |
|
|
): |
|
|
"""Get orchestrator statistics and performance metrics.""" |
|
|
try: |
|
|
stats = await orchestrator.get_stats() |
|
|
return { |
|
|
"status": "success", |
|
|
"statistics": stats |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting orchestrator stats: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@router.post("/workflows/patterns/{pattern}") |
|
|
async def execute_pattern_workflow( |
|
|
pattern: str, |
|
|
data: Dict[str, Any], |
|
|
current_user: User = Depends(get_current_user), |
|
|
orchestrator: AgentOrchestrator = Depends(get_orchestrator) |
|
|
): |
|
|
"""Execute a specific orchestration pattern.""" |
|
|
try: |
|
|
|
|
|
try: |
|
|
pattern_enum = OrchestrationPattern(pattern) |
|
|
except ValueError: |
|
|
raise HTTPException(status_code=400, detail=f"Invalid pattern: {pattern}") |
|
|
|
|
|
|
|
|
if pattern == "map_reduce": |
|
|
steps = [ |
|
|
WorkflowStep( |
|
|
step_id="map", |
|
|
agent_name="zumbi", |
|
|
action="analyze" |
|
|
), |
|
|
WorkflowStep( |
|
|
step_id="reduce", |
|
|
agent_name="anita", |
|
|
action="aggregate" |
|
|
) |
|
|
] |
|
|
elif pattern == "fan_out_fan_in": |
|
|
steps = [ |
|
|
WorkflowStep( |
|
|
step_id="analyze1", |
|
|
agent_name="zumbi", |
|
|
action="analyze" |
|
|
), |
|
|
WorkflowStep( |
|
|
step_id="analyze2", |
|
|
agent_name="maria_quiteria", |
|
|
action="security_audit" |
|
|
), |
|
|
WorkflowStep( |
|
|
step_id="analyze3", |
|
|
agent_name="bonifacio", |
|
|
action="policy_analysis" |
|
|
) |
|
|
] |
|
|
else: |
|
|
steps = [ |
|
|
WorkflowStep( |
|
|
step_id="step1", |
|
|
agent_name="zumbi", |
|
|
action="analyze" |
|
|
) |
|
|
] |
|
|
|
|
|
workflow = WorkflowDefinition( |
|
|
workflow_id=f"{pattern}_{uuid4()}", |
|
|
name=f"{pattern} workflow", |
|
|
pattern=pattern_enum, |
|
|
steps=steps |
|
|
) |
|
|
|
|
|
orchestrator._workflows[workflow.workflow_id] = workflow |
|
|
|
|
|
|
|
|
context = AgentContext( |
|
|
investigation_id=str(uuid4()), |
|
|
user_id=current_user.id, |
|
|
session_id=str(uuid4()), |
|
|
metadata={"pattern": pattern} |
|
|
) |
|
|
|
|
|
|
|
|
result = await orchestrator.execute_workflow( |
|
|
workflow.workflow_id, |
|
|
data, |
|
|
context |
|
|
) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"pattern": pattern, |
|
|
"result": result |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error executing pattern workflow: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@router.get("/workflows") |
|
|
async def list_workflows( |
|
|
current_user: User = Depends(get_current_user), |
|
|
orchestrator: AgentOrchestrator = Depends(get_orchestrator) |
|
|
): |
|
|
"""List all registered workflows.""" |
|
|
try: |
|
|
workflows = [] |
|
|
for workflow_id, workflow in orchestrator._workflows.items(): |
|
|
workflows.append({ |
|
|
"workflow_id": workflow_id, |
|
|
"name": workflow.name, |
|
|
"pattern": workflow.pattern.value, |
|
|
"steps": len(workflow.steps), |
|
|
"timeout": workflow.timeout |
|
|
}) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"total_workflows": len(workflows), |
|
|
"workflows": workflows |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error listing workflows: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |