"""
Module: tools.data_visualizer
Description: Data visualization utilities for government transparency data
Author: Anderson H. Silva
Date: 2025-01-15
"""
import json
import re
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class DataVisualizer:
"""Create visualizations for government transparency data."""
def __init__(self):
self.color_palette = {
"primary": "#3b82f6",
"secondary": "#10b981",
"warning": "#f59e0b",
"danger": "#ef4444",
"success": "#10b981",
"info": "#6366f1"
}
def _extract_numeric_value(self, value_str: str) -> float:
"""Extract numeric value from currency string."""
try:
if isinstance(value_str, (int, float)):
return float(value_str)
# Remove currency symbols and convert to float
numeric = re.sub(r'[^\d,.-]', '', str(value_str))
numeric = numeric.replace(',', '.')
return float(numeric)
except:
return 0.0
def _format_currency(self, value: float) -> str:
"""Format currency for display."""
if value >= 1_000_000_000:
return f"R$ {value/1_000_000_000:.1f}B"
elif value >= 1_000_000:
return f"R$ {value/1_000_000:.1f}M"
elif value >= 1_000:
return f"R$ {value/1_000:.1f}K"
else:
return f"R$ {value:.2f}"
def create_summary_cards(self, data: Dict[str, Any]) -> str:
"""Create summary cards visualization."""
if not data.get("success") or not data.get("data"):
return ""
items = data.get("data", [])
data_type = data.get("data_type", "unknown")
# Calculate summary statistics
total_items = len(items)
total_value = 0
avg_value = 0
max_value = 0
for item in items:
if data_type == "contracts":
value = self._extract_numeric_value(item.get("value", 0))
elif data_type == "expenses":
value = self._extract_numeric_value(item.get("value", 0))
elif data_type == "biddings":
value = self._extract_numeric_value(item.get("value", 0))
else:
value = 0
total_value += value
max_value = max(max_value, value)
avg_value = total_value / total_items if total_items > 0 else 0
# Create HTML cards
cards_html = f"""
{total_items}
Total de Registros
{self._format_currency(total_value)}
Valor Total
{self._format_currency(avg_value)}
Valor Médio
{self._format_currency(max_value)}
Maior Valor
"""
return cards_html
def create_top_entities_chart(self, data: Dict[str, Any]) -> str:
"""Create top entities chart."""
if not data.get("success") or not data.get("data"):
return ""
items = data.get("data", [])
data_type = data.get("data_type", "unknown")
# Count entities
entity_counts = {}
entity_values = {}
for item in items:
if data_type == "contracts":
entity = item.get("contractor", "Desconhecido")
value = self._extract_numeric_value(item.get("value", 0))
elif data_type == "expenses":
entity = item.get("beneficiary", "Desconhecido")
value = self._extract_numeric_value(item.get("value", 0))
elif data_type == "biddings":
entity = item.get("organ", "Desconhecido")
value = self._extract_numeric_value(item.get("value", 0))
else:
continue
# Truncate long names
if len(entity) > 40:
entity = entity[:37] + "..."
entity_counts[entity] = entity_counts.get(entity, 0) + 1
entity_values[entity] = entity_values.get(entity, 0) + value
# Get top 10 entities by count
top_entities = sorted(entity_counts.items(), key=lambda x: x[1], reverse=True)[:10]
if not top_entities:
return ""
# Create horizontal bar chart
max_count = max(count for _, count in top_entities)
chart_html = f"""
📊 Top 10 {"Contratados" if data_type == "contracts" else "Beneficiários" if data_type == "expenses" else "Órgãos"}
"""
for entity, count in top_entities:
width_percentage = (count / max_count) * 100
total_value = entity_values.get(entity, 0)
chart_html += f"""
{entity}
{count} • {self._format_currency(total_value)}
"""
chart_html += """
"""
return chart_html
def create_risk_indicators(self, risk_analysis: Dict[str, Any]) -> str:
"""Create risk indicators visualization."""
if not risk_analysis:
return ""
risk_score = risk_analysis.get("risk_score", 0)
risk_level = risk_analysis.get("risk_level", "BAIXO")
risk_factors = risk_analysis.get("risk_factors", [])
# Color based on risk level
risk_colors = {
"BAIXO": self.color_palette["success"],
"MÉDIO": self.color_palette["warning"],
"ALTO": self.color_palette["danger"],
"CRÍTICO": "#dc2626"
}
risk_color = risk_colors.get(risk_level, self.color_palette["info"])
# Risk score bar
score_percentage = (risk_score / 10) * 100
risk_html = f"""
🚨 Análise de Risco
{risk_level}
Nível de Risco
{risk_score:.1f}/10
Score de Risco
"""
# Risk factors
if risk_factors:
risk_html += """
Fatores de Risco Identificados:
"""
for factor in risk_factors[:5]: # Show max 5 factors
contract_id = factor.get("contract_id", factor.get("expense_id", "N/A"))
factors_list = factor.get("factors", [])
if factors_list:
risk_html += f"""
ID: {contract_id}
• {' • '.join(factors_list)}
"""
risk_html += "
"
risk_html += """
"""
return risk_html
def create_timeline_chart(self, data: Dict[str, Any]) -> str:
"""Create timeline chart for temporal analysis."""
if not data.get("success") or not data.get("data"):
return ""
items = data.get("data", [])
data_type = data.get("data_type", "unknown")
# Extract dates and values
date_values = {}
for item in items:
try:
if data_type == "contracts":
date_str = item.get("start_date", "")
value = self._extract_numeric_value(item.get("value", 0))
elif data_type == "expenses":
date_str = item.get("date", "")
value = self._extract_numeric_value(item.get("value", 0))
else:
continue
if date_str and date_str != "N/A":
# Parse date
date_obj = datetime.strptime(date_str, "%d/%m/%Y")
month_key = date_obj.strftime("%Y-%m")
if month_key not in date_values:
date_values[month_key] = {"count": 0, "value": 0}
date_values[month_key]["count"] += 1
date_values[month_key]["value"] += value
except:
continue
if not date_values:
return ""
# Sort by date
sorted_dates = sorted(date_values.items())
if len(sorted_dates) < 2:
return ""
# Create timeline
max_value = max(data["value"] for _, data in sorted_dates)
timeline_html = f"""
📈 Linha do Tempo
"""
for month, data in sorted_dates:
height_percentage = (data["value"] / max_value) * 100 if max_value > 0 else 0
# Format month
try:
month_obj = datetime.strptime(month, "%Y-%m")
month_display = month_obj.strftime("%b/%Y")
except:
month_display = month
timeline_html += f"""
{month_display}
{data['count']} • {self._format_currency(data['value'])}
"""
timeline_html += """
"""
return timeline_html
def create_comprehensive_visualization(
self,
data: Dict[str, Any],
risk_analysis: Optional[Dict[str, Any]] = None
) -> str:
"""Create comprehensive visualization combining all charts."""
if not data.get("success"):
return ""
visualization = ""
# Summary cards
visualization += self.create_summary_cards(data)
# Risk indicators
if risk_analysis:
visualization += self.create_risk_indicators(risk_analysis)
# Top entities chart
visualization += self.create_top_entities_chart(data)
# Timeline chart
visualization += self.create_timeline_chart(data)
return visualization
# Factory function
def create_data_visualizer() -> DataVisualizer:
"""Create a data visualizer instance."""
return DataVisualizer()