|
|
""" |
|
|
Module: cli.commands.watch |
|
|
Description: Real-time monitoring command for CLI |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-25 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timedelta |
|
|
from pathlib import Path |
|
|
from typing import Optional, List, Dict, Any, Set |
|
|
from enum import Enum |
|
|
import signal |
|
|
import sys |
|
|
|
|
|
import typer |
|
|
from rich.console import Console |
|
|
from rich.live import Live |
|
|
from rich.table import Table |
|
|
from rich.panel import Panel |
|
|
from rich.layout import Layout |
|
|
from rich.text import Text |
|
|
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn |
|
|
import httpx |
|
|
from pydantic import BaseModel, Field |
|
|
|
|
|
|
|
|
app = typer.Typer(help="Monitor government data in real-time for anomalies") |
|
|
console = Console() |
|
|
|
|
|
|
|
|
shutdown_requested = False |
|
|
|
|
|
|
|
|
class MonitoringMode(str, Enum): |
|
|
"""Monitoring mode options.""" |
|
|
CONTRACTS = "contracts" |
|
|
ORGANIZATIONS = "organizations" |
|
|
SUPPLIERS = "suppliers" |
|
|
ANOMALIES = "anomalies" |
|
|
ALL = "all" |
|
|
|
|
|
|
|
|
class AlertLevel(str, Enum): |
|
|
"""Alert level options.""" |
|
|
LOW = "low" |
|
|
MEDIUM = "medium" |
|
|
HIGH = "high" |
|
|
CRITICAL = "critical" |
|
|
|
|
|
|
|
|
class MonitoringConfig(BaseModel): |
|
|
"""Monitoring configuration.""" |
|
|
mode: MonitoringMode |
|
|
organizations: List[str] = Field(default_factory=list) |
|
|
suppliers: List[str] = Field(default_factory=list) |
|
|
categories: List[str] = Field(default_factory=list) |
|
|
min_value: Optional[float] = None |
|
|
anomaly_threshold: float = 0.7 |
|
|
alert_level: AlertLevel = AlertLevel.MEDIUM |
|
|
check_interval: int = 60 |
|
|
|
|
|
|
|
|
class MonitoringStats(BaseModel): |
|
|
"""Monitoring statistics.""" |
|
|
start_time: datetime |
|
|
checks_performed: int = 0 |
|
|
anomalies_detected: int = 0 |
|
|
alerts_triggered: int = 0 |
|
|
last_check: Optional[datetime] = None |
|
|
active_alerts: List[Dict[str, Any]] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
async def call_api( |
|
|
endpoint: str, |
|
|
method: str = "GET", |
|
|
data: Optional[Dict[str, Any]] = None, |
|
|
params: Optional[Dict[str, Any]] = None, |
|
|
auth_token: Optional[str] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Make API call to backend.""" |
|
|
api_url = "http://localhost:8000" |
|
|
|
|
|
headers = { |
|
|
"Content-Type": "application/json", |
|
|
"User-Agent": "Cidadao.AI-CLI/1.0" |
|
|
} |
|
|
|
|
|
if auth_token: |
|
|
headers["Authorization"] = f"Bearer {auth_token}" |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
response = await client.request( |
|
|
method=method, |
|
|
url=f"{api_url}{endpoint}", |
|
|
headers=headers, |
|
|
json=data, |
|
|
params=params, |
|
|
timeout=30.0 |
|
|
) |
|
|
|
|
|
if response.status_code >= 400: |
|
|
error_detail = response.json().get("detail", "Unknown error") |
|
|
raise Exception(f"API Error: {error_detail}") |
|
|
|
|
|
return response.json() |
|
|
|
|
|
|
|
|
def create_dashboard_layout() -> Layout: |
|
|
"""Create dashboard layout.""" |
|
|
layout = Layout() |
|
|
|
|
|
layout.split_column( |
|
|
Layout(name="header", size=3), |
|
|
Layout(name="main"), |
|
|
Layout(name="footer", size=4) |
|
|
) |
|
|
|
|
|
layout["main"].split_row( |
|
|
Layout(name="stats", ratio=1), |
|
|
Layout(name="alerts", ratio=2) |
|
|
) |
|
|
|
|
|
return layout |
|
|
|
|
|
|
|
|
def render_header(config: MonitoringConfig) -> Panel: |
|
|
"""Render header panel.""" |
|
|
header_text = Text() |
|
|
header_text.append("👀 Cidadão.AI Watch Mode", style="bold blue") |
|
|
header_text.append("\n") |
|
|
header_text.append(f"Mode: {config.mode.value} | ", style="dim") |
|
|
header_text.append(f"Threshold: {config.anomaly_threshold} | ", style="dim") |
|
|
header_text.append(f"Interval: {config.check_interval}s", style="dim") |
|
|
|
|
|
return Panel(header_text, border_style="blue") |
|
|
|
|
|
|
|
|
def render_stats(stats: MonitoringStats) -> Panel: |
|
|
"""Render statistics panel.""" |
|
|
elapsed = datetime.now() - stats.start_time |
|
|
hours, remainder = divmod(int(elapsed.total_seconds()), 3600) |
|
|
minutes, seconds = divmod(remainder, 60) |
|
|
|
|
|
stats_table = Table(show_header=False, box=None) |
|
|
stats_table.add_column("Label", style="dim") |
|
|
stats_table.add_column("Value", justify="right") |
|
|
|
|
|
stats_table.add_row("Running for", f"{hours:02d}:{minutes:02d}:{seconds:02d}") |
|
|
stats_table.add_row("Checks", str(stats.checks_performed)) |
|
|
stats_table.add_row("Anomalies", str(stats.anomalies_detected)) |
|
|
stats_table.add_row("Alerts", str(stats.alerts_triggered)) |
|
|
|
|
|
if stats.last_check: |
|
|
time_since = (datetime.now() - stats.last_check).total_seconds() |
|
|
stats_table.add_row("Last check", f"{int(time_since)}s ago") |
|
|
|
|
|
return Panel(stats_table, title="📊 Statistics", border_style="green") |
|
|
|
|
|
|
|
|
def render_alerts(stats: MonitoringStats) -> Panel: |
|
|
"""Render alerts panel.""" |
|
|
if not stats.active_alerts: |
|
|
content = Text("No active alerts", style="dim italic") |
|
|
else: |
|
|
alerts_table = Table(show_header=True, header_style="bold") |
|
|
alerts_table.add_column("Time", width=8) |
|
|
alerts_table.add_column("Level", width=8) |
|
|
alerts_table.add_column("Type", width=15) |
|
|
alerts_table.add_column("Description", width=40) |
|
|
|
|
|
|
|
|
for alert in stats.active_alerts[-10:]: |
|
|
level = alert.get("level", "unknown") |
|
|
level_color = { |
|
|
"low": "green", |
|
|
"medium": "yellow", |
|
|
"high": "red", |
|
|
"critical": "bold red" |
|
|
}.get(level, "white") |
|
|
|
|
|
time_str = datetime.fromisoformat(alert["timestamp"]).strftime("%H:%M:%S") |
|
|
|
|
|
alerts_table.add_row( |
|
|
time_str, |
|
|
f"[{level_color}]{level.upper()}[/{level_color}]", |
|
|
alert.get("type", "Unknown"), |
|
|
alert.get("description", "N/A")[:40] |
|
|
) |
|
|
|
|
|
content = alerts_table |
|
|
|
|
|
return Panel(content, title="🚨 Active Alerts", border_style="yellow") |
|
|
|
|
|
|
|
|
def render_footer() -> Panel: |
|
|
"""Render footer panel.""" |
|
|
footer_text = Text() |
|
|
footer_text.append("Press ", style="dim") |
|
|
footer_text.append("Ctrl+C", style="bold yellow") |
|
|
footer_text.append(" to stop monitoring", style="dim") |
|
|
|
|
|
return Panel(footer_text, border_style="dim") |
|
|
|
|
|
|
|
|
async def check_for_anomalies( |
|
|
config: MonitoringConfig, |
|
|
stats: MonitoringStats, |
|
|
auth_token: Optional[str] = None |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Check for anomalies based on monitoring mode.""" |
|
|
new_alerts = [] |
|
|
|
|
|
try: |
|
|
|
|
|
query_params = { |
|
|
"threshold": config.anomaly_threshold, |
|
|
"limit": 50 |
|
|
} |
|
|
|
|
|
if config.organizations: |
|
|
query_params["organizations"] = ",".join(config.organizations) |
|
|
if config.suppliers: |
|
|
query_params["suppliers"] = ",".join(config.suppliers) |
|
|
if config.categories: |
|
|
query_params["categories"] = ",".join(config.categories) |
|
|
if config.min_value: |
|
|
query_params["min_value"] = config.min_value |
|
|
|
|
|
|
|
|
if config.mode == MonitoringMode.CONTRACTS: |
|
|
|
|
|
contracts = await call_api( |
|
|
"/api/v1/data/contracts/recent", |
|
|
params=query_params, |
|
|
auth_token=auth_token |
|
|
) |
|
|
|
|
|
|
|
|
for contract in contracts: |
|
|
value = contract.get("value", 0) |
|
|
if config.min_value and value >= config.min_value: |
|
|
new_alerts.append({ |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"level": "high" if value > config.min_value * 2 else "medium", |
|
|
"type": "high_value", |
|
|
"description": f"Contract {contract['id']} with value R$ {value:,.2f}", |
|
|
"data": contract |
|
|
}) |
|
|
|
|
|
elif config.mode == MonitoringMode.ANOMALIES: |
|
|
|
|
|
anomalies = await call_api( |
|
|
"/api/v1/investigations/anomalies/recent", |
|
|
params=query_params, |
|
|
auth_token=auth_token |
|
|
) |
|
|
|
|
|
for anomaly in anomalies: |
|
|
severity = anomaly.get("severity", 0) |
|
|
if severity >= config.anomaly_threshold: |
|
|
level = ( |
|
|
"critical" if severity >= 0.9 else |
|
|
"high" if severity >= 0.8 else |
|
|
"medium" if severity >= 0.7 else |
|
|
"low" |
|
|
) |
|
|
|
|
|
new_alerts.append({ |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"level": level, |
|
|
"type": anomaly.get("type", "unknown"), |
|
|
"description": anomaly.get("description", "Anomaly detected"), |
|
|
"data": anomaly |
|
|
}) |
|
|
|
|
|
|
|
|
stats.checks_performed += 1 |
|
|
stats.last_check = datetime.now() |
|
|
|
|
|
if new_alerts: |
|
|
stats.anomalies_detected += len(new_alerts) |
|
|
stats.alerts_triggered += len([a for a in new_alerts if a["level"] in ["high", "critical"]]) |
|
|
stats.active_alerts.extend(new_alerts) |
|
|
|
|
|
|
|
|
if len(stats.active_alerts) > 100: |
|
|
stats.active_alerts = stats.active_alerts[-100:] |
|
|
|
|
|
return new_alerts |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
error_alert = { |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"level": "medium", |
|
|
"type": "error", |
|
|
"description": f"Check failed: {str(e)}", |
|
|
"data": {} |
|
|
} |
|
|
stats.active_alerts.append(error_alert) |
|
|
return [error_alert] |
|
|
|
|
|
|
|
|
def setup_signal_handlers(): |
|
|
"""Setup signal handlers for graceful shutdown.""" |
|
|
global shutdown_requested |
|
|
|
|
|
def signal_handler(sig, frame): |
|
|
global shutdown_requested |
|
|
shutdown_requested = True |
|
|
console.print("\n[yellow]Shutdown requested... finishing current check[/yellow]") |
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler) |
|
|
signal.signal(signal.SIGTERM, signal_handler) |
|
|
|
|
|
|
|
|
@app.command() |
|
|
def watch( |
|
|
mode: MonitoringMode = typer.Argument(help="What to monitor"), |
|
|
organizations: Optional[List[str]] = typer.Option(None, "--org", "-o", help="Organization codes to monitor"), |
|
|
suppliers: Optional[List[str]] = typer.Option(None, "--supplier", "-s", help="Supplier names to monitor"), |
|
|
categories: Optional[List[str]] = typer.Option(None, "--category", "-c", help="Contract categories to monitor"), |
|
|
min_value: Optional[float] = typer.Option(None, "--min-value", help="Minimum value threshold for alerts"), |
|
|
threshold: float = typer.Option(0.7, "--threshold", "-t", min=0.0, max=1.0, help="Anomaly detection threshold"), |
|
|
alert_level: AlertLevel = typer.Option(AlertLevel.MEDIUM, "--alert-level", "-a", help="Minimum alert level to display"), |
|
|
interval: int = typer.Option(60, "--interval", "-i", min=10, help="Check interval in seconds"), |
|
|
export_alerts: Optional[Path] = typer.Option(None, "--export", "-e", help="Export alerts to file"), |
|
|
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="CIDADAO_API_KEY", help="API key"), |
|
|
): |
|
|
""" |
|
|
👀 Monitor government data in real-time for anomalies. |
|
|
|
|
|
This command runs continuous monitoring of government contracts and |
|
|
spending, alerting you when anomalies or suspicious patterns are detected. |
|
|
|
|
|
Monitoring Modes: |
|
|
- contracts: Monitor new contracts as they appear |
|
|
- organizations: Focus on specific organizations |
|
|
- suppliers: Track specific supplier activities |
|
|
- anomalies: Direct anomaly detection monitoring |
|
|
- all: Comprehensive monitoring of everything |
|
|
|
|
|
Examples: |
|
|
cidadao watch contracts --min-value 1000000 |
|
|
cidadao watch anomalies --threshold 0.8 --interval 30 |
|
|
cidadao watch organizations --org MIN_SAUDE MIN_EDUCACAO |
|
|
""" |
|
|
global shutdown_requested |
|
|
|
|
|
|
|
|
setup_signal_handlers() |
|
|
|
|
|
|
|
|
console.print(f"\n[bold blue]👀 Starting {mode.value} monitoring[/bold blue]") |
|
|
console.print(f"Alert threshold: [yellow]{threshold}[/yellow]") |
|
|
console.print(f"Check interval: [yellow]{interval}s[/yellow]") |
|
|
|
|
|
if organizations: |
|
|
console.print(f"Organizations: [cyan]{', '.join(organizations)}[/cyan]") |
|
|
if suppliers: |
|
|
console.print(f"Suppliers: [cyan]{', '.join(suppliers)}[/cyan]") |
|
|
|
|
|
console.print("\n[dim]Press Ctrl+C to stop monitoring[/dim]\n") |
|
|
|
|
|
|
|
|
config = MonitoringConfig( |
|
|
mode=mode, |
|
|
organizations=organizations or [], |
|
|
suppliers=suppliers or [], |
|
|
categories=categories or [], |
|
|
min_value=min_value, |
|
|
anomaly_threshold=threshold, |
|
|
alert_level=alert_level, |
|
|
check_interval=interval |
|
|
) |
|
|
|
|
|
|
|
|
stats = MonitoringStats(start_time=datetime.now()) |
|
|
|
|
|
|
|
|
layout = create_dashboard_layout() |
|
|
|
|
|
|
|
|
export_file = None |
|
|
if export_alerts: |
|
|
export_path = export_alerts.expanduser().resolve() |
|
|
export_file = open(export_path, "a", encoding="utf-8") |
|
|
export_file.write(f"# Cidadão.AI Watch Mode - Started at {stats.start_time.isoformat()}\n") |
|
|
export_file.write(f"# Mode: {mode.value}, Threshold: {threshold}\n\n") |
|
|
|
|
|
try: |
|
|
|
|
|
with Live(layout, refresh_per_second=1, console=console) as live: |
|
|
while not shutdown_requested: |
|
|
|
|
|
layout["header"].update(render_header(config)) |
|
|
layout["stats"].update(render_stats(stats)) |
|
|
layout["alerts"].update(render_alerts(stats)) |
|
|
layout["footer"].update(render_footer()) |
|
|
|
|
|
|
|
|
new_alerts = asyncio.run( |
|
|
check_for_anomalies(config, stats, auth_token=api_key) |
|
|
) |
|
|
|
|
|
|
|
|
if export_file and new_alerts: |
|
|
for alert in new_alerts: |
|
|
export_file.write( |
|
|
f"{alert['timestamp']} | {alert['level'].upper()} | " |
|
|
f"{alert['type']} | {alert['description']}\n" |
|
|
) |
|
|
export_file.flush() |
|
|
|
|
|
|
|
|
for alert in new_alerts: |
|
|
if alert["level"] in ["high", "critical"]: |
|
|
console.bell() |
|
|
|
|
|
|
|
|
for _ in range(config.check_interval): |
|
|
if shutdown_requested: |
|
|
break |
|
|
asyncio.run(asyncio_sleep(1)) |
|
|
|
|
|
|
|
|
layout["stats"].update(render_stats(stats)) |
|
|
|
|
|
|
|
|
console.print("\n[green]✅ Monitoring stopped gracefully[/green]") |
|
|
|
|
|
|
|
|
console.print( |
|
|
Panel( |
|
|
f"[bold]Monitoring Summary[/bold]\n\n" |
|
|
f"Duration: {datetime.now() - stats.start_time}\n" |
|
|
f"Total checks: {stats.checks_performed}\n" |
|
|
f"Anomalies detected: {stats.anomalies_detected}\n" |
|
|
f"Alerts triggered: {stats.alerts_triggered}", |
|
|
title="📊 Final Statistics", |
|
|
border_style="blue" |
|
|
) |
|
|
) |
|
|
|
|
|
if export_file: |
|
|
export_file.write(f"\n# Monitoring ended at {datetime.now().isoformat()}\n") |
|
|
export_file.write(f"# Total anomalies: {stats.anomalies_detected}\n") |
|
|
console.print(f"\n[green]Alerts exported to: {export_alerts}[/green]") |
|
|
|
|
|
except Exception as e: |
|
|
console.print(f"[red]❌ Error: {e}[/red]") |
|
|
raise typer.Exit(1) |
|
|
finally: |
|
|
if export_file: |
|
|
export_file.close() |
|
|
|
|
|
|
|
|
@app.command() |
|
|
def test_connection( |
|
|
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="CIDADAO_API_KEY", help="API key"), |
|
|
): |
|
|
""" |
|
|
🔌 Test connection to the API. |
|
|
|
|
|
Verify that the CLI can connect to the backend API. |
|
|
""" |
|
|
console.print("[yellow]Testing API connection...[/yellow]") |
|
|
|
|
|
try: |
|
|
|
|
|
result = asyncio.run( |
|
|
call_api("/health", auth_token=api_key) |
|
|
) |
|
|
|
|
|
console.print("[green]✅ API connection successful![/green]") |
|
|
console.print(f"Status: {result.get('status', 'unknown')}") |
|
|
|
|
|
|
|
|
if api_key: |
|
|
console.print("\n[yellow]Testing authenticated access...[/yellow]") |
|
|
user_info = asyncio.run( |
|
|
call_api("/api/v1/auth/me", auth_token=api_key) |
|
|
) |
|
|
console.print("[green]✅ Authentication successful![/green]") |
|
|
console.print(f"User: {user_info.get('email', 'unknown')}") |
|
|
|
|
|
except Exception as e: |
|
|
console.print(f"[red]❌ Connection failed: {e}[/red]") |
|
|
console.print("\n[dim]Make sure the API is running at http://localhost:8000[/dim]") |
|
|
raise typer.Exit(1) |
|
|
|
|
|
|
|
|
|
|
|
async def asyncio_sleep(seconds: float): |
|
|
"""Async sleep helper.""" |
|
|
await asyncio.sleep(seconds) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app() |