File size: 7,555 Bytes
0c769eb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
"""
Integration module for Zumbi agent in chat flow.
This module provides a clean interface to use Zumbi agent from chat
without causing circular imports.
"""
import logging
from typing import Dict, Any, Optional
from datetime import datetime
from src.agents.zumbi import InvestigatorAgent, InvestigationRequest
from src.agents.deodoro import AgentContext, AgentStatus
logger = logging.getLogger(__name__)
# Cache for agent instance
_zumbi_agent_instance: Optional[InvestigatorAgent] = None
async def get_zumbi_agent() -> InvestigatorAgent:
"""
Get or create Zumbi agent instance with lazy loading.
Returns:
InvestigatorAgent instance
"""
global _zumbi_agent_instance
if _zumbi_agent_instance is None:
logger.info("Creating new Zumbi agent instance")
_zumbi_agent_instance = InvestigatorAgent()
await _zumbi_agent_instance.initialize()
return _zumbi_agent_instance
async def run_zumbi_investigation(
query: str,
organization_codes: Optional[list] = None,
enable_open_data: bool = True,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Run investigation using Zumbi agent.
Args:
query: Investigation query
organization_codes: Optional organization codes
enable_open_data: Enable dados.gov.br integration
session_id: Session ID
user_id: User ID
Returns:
Investigation results
"""
try:
# Get agent instance
agent = await get_zumbi_agent()
# Create investigation request
investigation_request = InvestigationRequest(
query=query,
organization_codes=organization_codes,
max_records=50,
enable_open_data_enrichment=enable_open_data,
anomaly_types=["price_anomaly", "vendor_concentration", "temporal_patterns"]
)
# Create context
context = AgentContext(
investigation_id=f"chat_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
user_id=user_id or "anonymous",
correlation_id=session_id or "default"
)
# Create message
message = {
"action": "investigate",
"payload": investigation_request.model_dump()
}
logger.info(f"Starting Zumbi investigation: {query}")
# Process investigation
response = await agent.process(message, context)
# Format response
if response.status == AgentStatus.COMPLETED:
result = response.result
# Extract key information
investigation_data = {
"status": "completed",
"anomalies_found": result.get("metadata", {}).get("anomalies_detected", 0),
"records_analyzed": result.get("metadata", {}).get("records_analyzed", 0),
"anomalies": result.get("anomalies", []),
"summary": result.get("summary", {}),
"open_data_available": False,
"related_datasets": []
}
# Check for open data enrichment
if enable_open_data:
# Count datasets found
datasets_found = set()
for anomaly in investigation_data["anomalies"]:
evidence = anomaly.get("evidence", {})
if evidence.get("_open_data_available"):
investigation_data["open_data_available"] = True
for dataset in evidence.get("_related_datasets", []):
datasets_found.add(dataset.get("title", "Unknown"))
investigation_data["related_datasets"] = list(datasets_found)
return investigation_data
else:
logger.error(f"Zumbi investigation failed: {response.error}")
return {
"status": "error",
"error": response.error or "Investigation failed",
"anomalies_found": 0,
"records_analyzed": 0
}
except Exception as e:
logger.error(f"Error in Zumbi investigation: {e}")
return {
"status": "error",
"error": str(e),
"anomalies_found": 0,
"records_analyzed": 0
}
def format_investigation_message(investigation_data: Dict[str, Any]) -> str:
"""
Format investigation results for chat response.
Args:
investigation_data: Investigation results
Returns:
Formatted message
"""
if investigation_data["status"] == "error":
return f"❌ Erro na investigação: {investigation_data.get('error', 'Erro desconhecido')}"
message = "🏹 **Investigação Concluída**\n\n"
message += f"📊 **Resumo da Análise:**\n"
message += f"• Registros analisados: {investigation_data['records_analyzed']}\n"
message += f"• Anomalias detectadas: {investigation_data['anomalies_found']}\n"
# Add open data information if available
if investigation_data.get("open_data_available"):
datasets_count = len(investigation_data.get("related_datasets", []))
message += f"• 📂 Datasets abertos encontrados: {datasets_count}\n"
message += "\n"
# Show anomalies
if investigation_data["anomalies_found"] > 0:
message += "⚠️ **Anomalias Detectadas:**\n"
for i, anomaly in enumerate(investigation_data["anomalies"][:5], 1):
severity = anomaly.get("severity", 0)
severity_emoji = "🔴" if severity > 0.7 else "🟡" if severity > 0.4 else "🟢"
message += f"\n{severity_emoji} **{i}. {anomaly.get('anomaly_type', 'Unknown').replace('_', ' ').title()}**\n"
message += f" • Severidade: {severity:.2f}\n"
message += f" • {anomaly.get('description', 'Sem descrição')}\n"
# Add open data reference if available
evidence = anomaly.get("evidence", {})
if evidence.get("_related_datasets"):
message += f" • 📂 Dados abertos relacionados disponíveis\n"
else:
message += "✅ Nenhuma anomalia significativa foi detectada nos dados analisados.\n"
# Add summary statistics if available
summary = investigation_data.get("summary", {})
if summary:
message += f"\n📈 **Estatísticas:**\n"
if "total_value" in summary:
message += f"• Valor total analisado: R$ {summary['total_value']:,.2f}\n"
if "organizations_count" in summary:
message += f"• Organizações: {summary['organizations_count']}\n"
if "suppliers_count" in summary:
message += f"• Fornecedores: {summary['suppliers_count']}\n"
# Add note about open data if found
if investigation_data.get("open_data_available"):
message += f"\n💡 **Dados Abertos Disponíveis:**\n"
message += f"Encontramos {len(investigation_data['related_datasets'])} conjuntos de dados relacionados no dados.gov.br "
message += f"que podem fornecer informações adicionais para sua análise.\n"
# List first 3 datasets
for dataset in investigation_data["related_datasets"][:3]:
message += f"• {dataset}\n"
return message |