anderson-ufrj
feat(chat): enable Zumbi agent with dados.gov.br integration in chat flow
4a7ce2c
"""
Chat API endpoints for conversational interface
VERSION: 2025-09-20 13:45:00 - Lazy initialization fix
"""
print("=== CHAT.PY LOADING - VERSION 13:45:00 ===")
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List
import asyncio
from src.core import json_utils
import uuid
from datetime import datetime
from src.core import get_logger
from src.core.exceptions import ValidationError
from src.api.dependencies import get_current_optional_user
from src.api.routes.chat_drummond_factory import get_drummond_agent
from src.api.routes.chat_zumbi_integration import run_zumbi_investigation, format_investigation_message
from src.agents.deodoro import AgentMessage, AgentContext, AgentResponse, AgentStatus
from src.agents.abaporu import MasterAgent
from src.services.chat_service import IntentDetector, IntentType
from src.api.models.pagination import CursorPaginationResponse
from src.services.chat_data_integration import chat_data_integration
# Import models for the simple fallback agent
class DataSourceType:
"""Simple data source type for fallback."""
CONTRACTS = "contratos"
SERVANTS = "servidores"
EXPENSES = "despesas"
BIDDINGS = "licitacoes"
class UniversalSearchRequest(BaseModel):
"""Universal search request model."""
query: str
data_source: str
filters: Dict[str, Any] = Field(default_factory=dict)
max_results: int = Field(default=100)
# Simple fallback agent will be imported lazily if needed
enhanced_zumbi = None
logger = get_logger(__name__)
router = APIRouter(tags=["chat"])
# Import chat service with error handling
try:
from src.services.chat_service_with_cache import chat_service
if chat_service is None:
from src.services.chat_service_with_cache import get_chat_service
chat_service = get_chat_service()
except Exception as e:
logger.warning(f"Failed to import chat_service: {e}")
chat_service = None
# Services are already initialized
intent_detector = IntentDetector()
# Agents will be initialized lazily to avoid import-time errors
master_agent = None
def get_master_agent():
"""Get or create master agent instance lazily."""
global master_agent
if master_agent is None:
try:
# MasterAgent needs llm_service and memory_agent
# For now, return None since we don't have these dependencies
logger.warning("MasterAgent initialization skipped - dependencies not available")
return None
except Exception as e:
logger.error(f"Failed to initialize MasterAgent: {type(e).__name__}: {e}")
return None
return master_agent
class ChatRequest(BaseModel):
"""Chat message request"""
message: str = Field(..., min_length=1, max_length=1000)
session_id: Optional[str] = None
context: Optional[Dict[str, Any]] = None
class ChatResponse(BaseModel):
"""Chat message response"""
session_id: str
agent_id: str
agent_name: str
message: str
confidence: float
suggested_actions: Optional[List[str]] = None
requires_input: Optional[Dict[str, str]] = None
metadata: Dict[str, Any] = {}
class QuickAction(BaseModel):
"""Quick action suggestion"""
id: str
label: str
icon: str
action: str
@router.post("/message", response_model=ChatResponse)
async def send_message(
request: ChatRequest,
current_user = Depends(get_current_optional_user)
) -> ChatResponse:
"""
Process a chat message and return agent response
"""
try:
# Check if chat service is available
if chat_service is None:
logger.error("Chat service not available")
return {
"response": "Desculpe, o serviço de chat está temporariamente indisponível.",
"session_id": request.session_id or str(uuid.uuid4()),
"message_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"intent": None
}
# Get or create session
session_id = request.session_id or str(uuid.uuid4())
session = await chat_service.get_or_create_session(
session_id,
user_id=current_user.id if current_user else None
)
# Detect intent from message
intent = await intent_detector.detect(request.message)
logger.info(f"Detected intent: {intent.type} with confidence {intent.confidence}")
# Check if user is asking for specific government data
portal_data = None
message_lower = request.message.lower()
data_keywords = ["contratos", "gastos", "despesas", "licitação", "fornecedor", "servidor",
"órgão", "ministério", "prefeitura", "cnpj", "valor", "empresa"]
should_fetch_data = any(keyword in message_lower for keyword in data_keywords)
# If user is asking for data and intent suggests investigation/analysis
if should_fetch_data and intent.type in [IntentType.INVESTIGATE, IntentType.ANALYZE, IntentType.UNKNOWN]:
try:
logger.info(f"Fetching real data from Portal da Transparência for query: {request.message}")
portal_result = await chat_data_integration.process_user_query(request.message, request.context)
if portal_result and portal_result.get("data"):
portal_data = portal_result
logger.info(f"Found {portal_result.get('data', {}).get('total', 0)} records from Portal da Transparência")
except Exception as e:
logger.warning(f"Portal da Transparência integration failed: {e}")
# Determine target agent based on intent
if intent.type in [IntentType.GREETING, IntentType.CONVERSATION, IntentType.HELP_REQUEST,
IntentType.ABOUT_SYSTEM, IntentType.SMALLTALK, IntentType.THANKS, IntentType.GOODBYE]:
target_agent = "drummond"
logger.info(f"Routing to Drummond for intent type: {intent.type}")
elif intent.type == IntentType.INVESTIGATE:
target_agent = "abaporu"
logger.info(f"Routing to Abaporu for intent type: {intent.type}")
else:
target_agent = "abaporu" # Default to master agent
logger.info(f"Defaulting to Abaporu for intent type: {intent.type}")
# Create agent message with Portal data if available
payload_data = {
"user_message": request.message,
"intent": intent.dict(),
"context": request.context or {},
"session": session.to_dict()
}
if portal_data:
payload_data["portal_data"] = portal_data
agent_message = AgentMessage(
sender="user",
recipient=target_agent,
action="process_chat",
payload=payload_data,
context={
"investigation_id": session.current_investigation_id,
"user_id": session.user_id,
"session_id": session_id
}
)
# Create agent context
agent_context = AgentContext(
investigation_id=session.current_investigation_id,
user_id=session.user_id,
session_id=session_id
)
# Route to appropriate agent based on intent
logger.info(f"Target agent: {target_agent}")
if target_agent == "drummond":
# Use Drummond for conversational intents
drummond_loaded = False
try:
drummond_agent = await get_drummond_agent()
if drummond_agent:
response = await drummond_agent.process(agent_message, agent_context)
drummond_loaded = True
else:
raise Exception("Drummond agent not available")
agent_id = "drummond"
agent_name = "Carlos Drummond de Andrade"
logger.info(f"Drummond response received: {response}")
except Exception as e:
logger.error(f"Error processing with Drummond: {e}")
import traceback
traceback.print_exc()
# Fall through to simple responses
# If Drummond failed, use simple responses
if not drummond_loaded:
logger.info("Using fallback responses for conversational intents")
# Simple responses based on intent
if intent.type == IntentType.GREETING:
message = "Olá! Sou o Cidadão.AI. Como posso ajudá-lo com transparência governamental?"
elif intent.type == IntentType.HELP_REQUEST:
message = "Posso ajudar você a investigar contratos, analisar gastos públicos e detectar anomalias. Experimente perguntar 'quero investigar contratos da saúde'!"
elif intent.type == IntentType.ABOUT_SYSTEM:
message = "O Cidadão.AI é um sistema multi-agente para análise de transparência governamental. Temos agentes especializados em investigação, análise e geração de relatórios."
elif intent.type == IntentType.THANKS:
message = "De nada! Estou sempre aqui para ajudar com a transparência pública."
elif intent.type == IntentType.GOODBYE:
message = "Até logo! Volte sempre que precisar de informações sobre gastos públicos."
else:
message = "Interessante sua pergunta! Posso ajudá-lo a investigar contratos ou analisar gastos públicos. O que gostaria de explorar?"
response = AgentResponse(
agent_name="Drummond (Simplificado)",
status=AgentStatus.COMPLETED,
result={
"message": message,
"intent_type": intent.type.value,
"status": "fallback"
},
metadata={
"confidence": 0.8,
"simplified": True
}
)
agent_id = "drummond"
agent_name = "Carlos Drummond de Andrade"
elif target_agent == "abaporu" and intent.type == IntentType.INVESTIGATE:
# Handle investigation requests with Zumbi
try:
logger.info("Routing investigation to Zumbi agent")
# Extract what to investigate from the message
search_query = request.message.lower()
data_source = DataSourceType.CONTRACTS # Default
# Detect data source from keywords
if any(word in search_query for word in ["servidor", "salário", "funcionário"]):
data_source = DataSourceType.SERVANTS
elif any(word in search_query for word in ["despesa", "gasto", "pagamento"]):
data_source = DataSourceType.EXPENSES
elif any(word in search_query for word in ["licitação", "pregão"]):
data_source = DataSourceType.BIDDINGS
# Create investigation request
investigation_request = UniversalSearchRequest(
query=request.message,
data_source=data_source,
max_results=10
)
# Run investigation with Zumbi agent
logger.info("Running Zumbi investigation with dados.gov.br integration")
# Extract organization codes from intent entities if available
org_codes = None
if intent.entities:
orgs = [e.value for e in intent.entities if e.type == "organization"]
if orgs:
# Map organization names to codes if needed
org_codes = orgs # For now, use as-is
# Run investigation with dados.gov.br enabled
investigation_result = await run_zumbi_investigation(
query=request.message,
organization_codes=org_codes,
enable_open_data=True, # Always enable dados.gov.br search
session_id=session_id,
user_id=session.user_id
)
if investigation_result["status"] == "error":
# Return error response
return ChatResponse(
session_id=session_id,
agent_id="zumbi",
agent_name="Zumbi dos Palmares",
message=f"❌ Erro na investigação: {investigation_result['error']}",
confidence=0.0,
metadata={"status": "error", "error": investigation_result["error"]}
)
# Format response using the integration helper
message = format_investigation_message(investigation_result)
# Add suggested actions if anomalies were found
suggested_actions = []
if investigation_result["anomalies_found"] > 0:
suggested_actions.append("🔍 Ver detalhes das anomalias")
suggested_actions.append("📊 Gerar relatório completo")
if investigation_result.get("open_data_available"):
suggested_actions.append("📂 Explorar dados abertos relacionados")
else:
suggested_actions.append("🔎 Refinar busca")
suggested_actions.append("📈 Analisar outros períodos")
response = AgentResponse(
agent_name="Zumbi dos Palmares",
status=AgentStatus.COMPLETED,
result={
"message": message,
"investigation_summary": {
"anomalies_found": investigation_result["anomalies_found"],
"records_analyzed": investigation_result["records_analyzed"],
"open_data_available": investigation_result.get("open_data_available", False),
"datasets_count": len(investigation_result.get("related_datasets", []))
},
"status": "completed",
"suggested_actions": suggested_actions
},
metadata={
"confidence": 0.9,
"investigation_id": session.current_investigation_id,
"dados_gov_enabled": True
}
)
agent_id = "zumbi"
agent_name = "Zumbi dos Palmares"
except Exception as e:
logger.error(f"Investigation error: {e}")
response = AgentResponse(
agent_name="Sistema",
status=AgentStatus.ERROR,
result={
"message": "Erro ao processar investigação. Por favor, tente novamente.",
"error": str(e)
},
metadata={"confidence": 0.0}
)
agent_id = "system"
agent_name = "Sistema"
else:
# For now, return a simple response if agents are not available
logger.warning(f"Falling back to maintenance message. Target: {target_agent}")
# Include debug info about why Drummond failed
debug_info = ""
if target_agent == "drummond":
debug_info = " (Drummond not initialized)"
response = AgentResponse(
agent_name="Sistema",
status=AgentStatus.COMPLETED,
result={
"message": f"Desculpe, estou em manutenção. Por favor, tente novamente em alguns instantes.{debug_info}",
"status": "maintenance",
"debug": "Drummond not available" if target_agent == "drummond" else None
},
metadata={
"confidence": 0.0
}
)
agent_id = "system"
agent_name = "Sistema"
# Save to chat history
await chat_service.save_message(
session_id=session_id,
role="user",
content=request.message
)
# Get content from response
response_content = response.result if hasattr(response, 'result') else str(response)
await chat_service.save_message(
session_id=session_id,
role="assistant",
content=response_content,
agent_id=agent_id
)
# Prepare suggested actions based on response
suggested_actions = []
# Check if response has custom suggested actions
if hasattr(response, 'result') and isinstance(response.result, dict):
custom_actions = response.result.get("suggested_actions", [])
if custom_actions:
suggested_actions = custom_actions
# Fall back to default actions if no custom ones
if not suggested_actions:
if intent.type == IntentType.INVESTIGATE and not session.current_investigation_id:
suggested_actions = ["start_investigation", "view_examples", "learn_more"]
elif session.current_investigation_id:
suggested_actions = ["view_progress", "generate_report", "new_investigation"]
# Extract message from response
if hasattr(response, 'result') and isinstance(response.result, dict):
message_text = response.result.get("message", str(response.result))
requires_input = response.result.get("requires_input")
elif hasattr(response, 'result'):
message_text = str(response.result)
requires_input = None
else:
message_text = str(response)
requires_input = None
# If we have Portal data and no agent processed it, use the Portal response
if portal_data and portal_data.get("response") and agent_id == "system":
message_text = portal_data["response"]
agent_id = "portal_transparencia"
agent_name = "Portal da Transparência"
# Build metadata
metadata = {
"intent_type": intent.type.value,
"processing_time": response.metadata.get("processing_time", 0) if hasattr(response, 'metadata') else 0,
"is_demo_mode": not bool(current_user),
"timestamp": datetime.utcnow().isoformat()
}
# Add Portal da Transparência data to metadata if available
if portal_data:
metadata["portal_data"] = {
"type": portal_data.get("data_type"),
"entities_found": portal_data.get("entities", {}),
"total_records": portal_data.get("data", {}).get("total", 0) if portal_data.get("data") else 0,
"has_data": bool(portal_data.get("data"))
}
return ChatResponse(
session_id=session_id,
agent_id=agent_id,
agent_name=agent_name,
message=message_text,
confidence=response.metadata.get("confidence", intent.confidence) if hasattr(response, 'metadata') else intent.confidence,
suggested_actions=suggested_actions,
requires_input=requires_input,
metadata=metadata
)
except Exception as e:
logger.error(f"Chat error: {str(e)}")
raise HTTPException(status_code=500, detail="Erro ao processar mensagem")
@router.post("/stream")
async def stream_message(request: ChatRequest):
"""
Stream chat response using Server-Sent Events (SSE)
"""
async def generate():
try:
# Send initial event
yield f"data: {json_utils.dumps({'type': 'start', 'timestamp': datetime.utcnow().isoformat()})}\n\n"
# Detect intent
yield f"data: {json_utils.dumps({'type': 'detecting', 'message': 'Analisando sua mensagem...'})}\n\n"
await asyncio.sleep(0.5)
intent = await intent_detector.detect(request.message)
yield f"data: {json_utils.dumps({'type': 'intent', 'intent': intent.type.value, 'confidence': intent.confidence})}\n\n"
# Select agent
agent = await chat_service.get_agent_for_intent(intent)
yield f"data: {json_utils.dumps({'type': 'agent_selected', 'agent_id': agent.agent_id, 'agent_name': agent.name})}\n\n"
await asyncio.sleep(0.3)
# Process message in chunks (simulate typing)
response_text = f"Olá! Sou {agent.name} e vou ajudá-lo com sua solicitação sobre {intent.type.value}."
# Send response in chunks
words = response_text.split()
chunk = ""
for i, word in enumerate(words):
chunk += word + " "
if i % 3 == 0: # Send every 3 words
yield f"data: {json_utils.dumps({'type': 'chunk', 'content': chunk.strip()})}\n\n"
chunk = ""
await asyncio.sleep(0.1)
if chunk: # Send remaining words
yield f"data: {json_utils.dumps({'type': 'chunk', 'content': chunk.strip()})}\n\n"
# Send completion
yield f"data: {json_utils.dumps({'type': 'complete', 'suggested_actions': ['start_investigation', 'learn_more']})}\n\n"
except Exception as e:
logger.error(f"Stream error: {str(e)}")
yield f"data: {json_utils.dumps({'type': 'error', 'message': 'Erro ao processar mensagem'})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*"
}
)
@router.get("/suggestions")
async def get_suggestions() -> List[QuickAction]:
"""
Get quick action suggestions for the chat
"""
return [
QuickAction(
id="investigate",
label="Investigar contratos",
icon="search",
action="Quero investigar contratos do Ministério da Saúde"
),
QuickAction(
id="anomalies",
label="Ver anomalias recentes",
icon="alert-circle",
action="Mostre as principais anomalias detectadas"
),
QuickAction(
id="report",
label="Gerar relatório",
icon="file-text",
action="Gere um relatório das últimas investigações"
),
QuickAction(
id="help",
label="Como funciona?",
icon="help-circle",
action="Como o Cidadão.AI funciona?"
)
]
@router.get("/history/{session_id}")
async def get_chat_history(
session_id: str,
limit: int = 50,
current_user = Depends(get_current_optional_user)
) -> Dict[str, Any]:
"""
Get chat history for a session
"""
session = await chat_service.get_session(session_id)
# Verify user has access to this session
if session.user_id and current_user and session.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Acesso negado")
messages = await chat_service.get_session_messages(session_id, limit)
return {
"session_id": session_id,
"messages": messages,
"total_messages": len(messages),
"current_investigation_id": session.current_investigation_id
}
@router.get("/history/{session_id}/paginated")
async def get_chat_history_paginated(
session_id: str,
cursor: Optional[str] = None,
limit: int = 50,
direction: str = "prev",
current_user = Depends(get_current_optional_user)
) -> CursorPaginationResponse[Dict[str, Any]]:
"""
Get paginated chat history using cursor pagination.
This is more efficient for large chat histories and real-time updates.
Args:
session_id: Session identifier
cursor: Pagination cursor from previous request
limit: Number of messages per page (max: 100)
direction: "next" for newer messages, "prev" for older (default)
"""
session = await ChatService.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Sessão não encontrada")
# Verify user has access to this session
if session.user_id and current_user and session.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Acesso negado")
# Get paginated messages
paginated_response = await ChatService.get_session_messages_paginated(
session_id=session_id,
cursor=cursor,
limit=min(limit, 100), # Cap at 100
direction=direction
)
# Add session info to metadata
paginated_response.metadata.update({
"session_id": session_id,
"investigation_id": session.current_investigation_id,
"session_created": session.created_at.isoformat() if session else None
})
return paginated_response
@router.delete("/history/{session_id}")
async def clear_chat_history(
session_id: str,
current_user = Depends(get_current_optional_user)
) -> Dict[str, str]:
"""
Clear chat history for a session
"""
session = await chat_service.get_session(session_id)
# Verify user has access
if session.user_id and current_user and session.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Acesso negado")
await chat_service.clear_session(session_id)
return {"message": "Histórico limpo com sucesso"}
@router.get("/cache/stats")
async def get_cache_stats(
current_user = Depends(get_current_optional_user)
) -> Dict[str, Any]:
"""
Get cache statistics (admin only in production)
"""
try:
stats = await ChatService.get_cache_stats()
return stats
except Exception as e:
logger.error(f"Error getting cache stats: {e}")
return {"error": "Unable to get cache statistics"}
@router.get("/agents")
async def get_available_agents() -> List[Dict[str, Any]]:
"""
Get list of available agents for chat
"""
return [
{
"id": "abaporu",
"name": "Abaporu",
"avatar": "🎨",
"role": "Orquestrador Master",
"description": "Coordena investigações complexas",
"status": "active"
},
{
"id": "zumbi",
"name": "Zumbi dos Palmares",
"avatar": "🔍",
"role": "Investigador",
"description": "Detecta anomalias e irregularidades",
"status": "active"
},
{
"id": "anita",
"name": "Anita Garibaldi",
"avatar": "📊",
"role": "Analista",
"description": "Analisa padrões e tendências",
"status": "active"
},
{
"id": "tiradentes",
"name": "Tiradentes",
"avatar": "📝",
"role": "Relator",
"description": "Gera relatórios detalhados",
"status": "active"
},
{
"id": "machado",
"name": "Machado de Assis",
"avatar": "📚",
"role": "Analista Textual",
"description": "Analisa documentos e contratos",
"status": "active"
},
{
"id": "dandara",
"name": "Dandara",
"avatar": "⚖️",
"role": "Justiça Social",
"description": "Avalia equidade e inclusão",
"status": "active"
}
]
@router.get("/debug/drummond-status")
async def debug_drummond_status():
"""Debug endpoint to check Drummond agent status"""
return {
"drummond_initialized": True, # Factory handles initialization
"drummond_error": None,
"drummond_type": "Factory managed",
"has_process_method": True,
"intent_types_for_drummond": [
"GREETING", "CONVERSATION", "HELP_REQUEST",
"ABOUT_SYSTEM", "SMALLTALK", "THANKS", "GOODBYE"
]
}