|
|
"""Notification service for alerts and updates. |
|
|
|
|
|
This service integrates multiple notification channels: |
|
|
- In-memory notifications (for UI) |
|
|
- Email notifications |
|
|
- Webhook notifications |
|
|
- Push notifications (future) |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from typing import Dict, List, Optional, Union, Any |
|
|
from datetime import datetime, timezone |
|
|
from enum import Enum |
|
|
from pydantic import BaseModel, Field |
|
|
import structlog |
|
|
|
|
|
|
|
|
try: |
|
|
from src.services.email_service import email_service, EmailMessage |
|
|
from src.services.webhook_service import webhook_service, WebhookEvent |
|
|
from src.models.notification_models import NotificationPreference, NotificationChannel |
|
|
ADVANCED_FEATURES_AVAILABLE = True |
|
|
except (ImportError, AttributeError): |
|
|
|
|
|
email_service = None |
|
|
webhook_service = None |
|
|
EmailMessage = None |
|
|
WebhookEvent = None |
|
|
NotificationPreference = None |
|
|
NotificationChannel = None |
|
|
ADVANCED_FEATURES_AVAILABLE = False |
|
|
from src.core.logging import get_logger |
|
|
from src.core import json_utils |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class NotificationLevel(Enum): |
|
|
"""Notification severity levels.""" |
|
|
INFO = "info" |
|
|
WARNING = "warning" |
|
|
ERROR = "error" |
|
|
CRITICAL = "critical" |
|
|
|
|
|
|
|
|
class NotificationType(Enum): |
|
|
"""Notification types.""" |
|
|
INVESTIGATION_COMPLETE = "investigation_complete" |
|
|
ANOMALY_DETECTED = "anomaly_detected" |
|
|
AGENT_ERROR = "agent_error" |
|
|
SYSTEM_ALERT = "system_alert" |
|
|
REPORT_READY = "report_ready" |
|
|
EXPORT_COMPLETE = "export_complete" |
|
|
|
|
|
|
|
|
class Notification(BaseModel): |
|
|
"""Notification model.""" |
|
|
id: str |
|
|
type: NotificationType |
|
|
level: NotificationLevel |
|
|
title: str |
|
|
message: str |
|
|
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
read: bool = False |
|
|
channels_sent: List[str] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
class NotificationService: |
|
|
"""Service for managing notifications across multiple channels.""" |
|
|
|
|
|
def __init__(self): |
|
|
self._notifications: List[Notification] = [] |
|
|
self._preferences: Dict[str, Dict[str, Any]] = {} |
|
|
self._max_notifications = 1000 |
|
|
|
|
|
async def send_notification( |
|
|
self, |
|
|
user_id: str, |
|
|
type: NotificationType, |
|
|
title: str, |
|
|
message: str, |
|
|
level: NotificationLevel = NotificationLevel.INFO, |
|
|
metadata: Optional[Dict[str, Any]] = None, |
|
|
channels: Optional[List[str]] = None |
|
|
) -> Notification: |
|
|
"""Send a notification through configured channels. |
|
|
|
|
|
Args: |
|
|
user_id: User to send notification to |
|
|
type: Type of notification |
|
|
title: Notification title |
|
|
message: Notification message |
|
|
level: Severity level |
|
|
metadata: Additional data |
|
|
channels: Specific channels to use (overrides preferences) |
|
|
|
|
|
Returns: |
|
|
Created notification |
|
|
""" |
|
|
|
|
|
notification = Notification( |
|
|
id=f"notif_{datetime.now(timezone.utc).timestamp()}_{len(self._notifications)}", |
|
|
type=type, |
|
|
level=level, |
|
|
title=title, |
|
|
message=message, |
|
|
metadata=metadata or {} |
|
|
) |
|
|
|
|
|
|
|
|
self._notifications.append(notification) |
|
|
self._trim_notifications() |
|
|
|
|
|
|
|
|
if channels is None: |
|
|
channels = await self._get_user_channels(user_id, type, level) |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
|
|
|
if "email" in channels and ADVANCED_FEATURES_AVAILABLE and email_service: |
|
|
tasks.append(self._send_email(user_id, notification)) |
|
|
|
|
|
if "webhook" in channels and ADVANCED_FEATURES_AVAILABLE and webhook_service: |
|
|
tasks.append(self._send_webhook(notification)) |
|
|
|
|
|
if "push" in channels: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
if tasks: |
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
for i, channel in enumerate(channels): |
|
|
if i < len(results) and not isinstance(results[i], Exception): |
|
|
notification.channels_sent.append(channel) |
|
|
|
|
|
logger.info( |
|
|
"Notification sent", |
|
|
user_id=user_id, |
|
|
type=type.value, |
|
|
level=level.value, |
|
|
channels=notification.channels_sent |
|
|
) |
|
|
|
|
|
return notification |
|
|
|
|
|
async def send_anomaly_alert( |
|
|
self, |
|
|
user_id: str, |
|
|
anomaly_data: Dict[str, Any] |
|
|
) -> Notification: |
|
|
"""Send alert for detected anomaly.""" |
|
|
severity = anomaly_data.get("severity", "medium") |
|
|
level_map = { |
|
|
"low": NotificationLevel.INFO, |
|
|
"medium": NotificationLevel.WARNING, |
|
|
"high": NotificationLevel.ERROR, |
|
|
"critical": NotificationLevel.CRITICAL |
|
|
} |
|
|
|
|
|
return await self.send_notification( |
|
|
user_id=user_id, |
|
|
type=NotificationType.ANOMALY_DETECTED, |
|
|
title=f"Anomalia Detectada - {severity.upper()}", |
|
|
message=anomaly_data.get("description", "Uma anomalia foi detectada no sistema"), |
|
|
level=level_map.get(severity, NotificationLevel.WARNING), |
|
|
metadata=anomaly_data |
|
|
) |
|
|
|
|
|
async def send_investigation_complete( |
|
|
self, |
|
|
user_id: str, |
|
|
investigation_id: str, |
|
|
results: Dict[str, Any] |
|
|
) -> Notification: |
|
|
"""Send notification when investigation is complete.""" |
|
|
anomalies_count = results.get("anomalies_count", 0) |
|
|
confidence_score = results.get("confidence_score", 0) |
|
|
|
|
|
title = "Investigação Concluída" |
|
|
if anomalies_count > 0: |
|
|
title += f" - {anomalies_count} Anomalias Encontradas" |
|
|
|
|
|
message = f"A investigação {investigation_id} foi concluída com sucesso. " |
|
|
message += f"Confiança: {confidence_score:.1f}%" |
|
|
|
|
|
return await self.send_notification( |
|
|
user_id=user_id, |
|
|
type=NotificationType.INVESTIGATION_COMPLETE, |
|
|
title=title, |
|
|
message=message, |
|
|
level=NotificationLevel.INFO if anomalies_count == 0 else NotificationLevel.WARNING, |
|
|
metadata={ |
|
|
"investigation_id": investigation_id, |
|
|
**results |
|
|
} |
|
|
) |
|
|
|
|
|
async def send_report_ready( |
|
|
self, |
|
|
user_id: str, |
|
|
report_id: str, |
|
|
report_type: str, |
|
|
download_url: Optional[str] = None |
|
|
) -> Notification: |
|
|
"""Send notification when report is ready.""" |
|
|
return await self.send_notification( |
|
|
user_id=user_id, |
|
|
type=NotificationType.REPORT_READY, |
|
|
title=f"Relatório {report_type} Pronto", |
|
|
message=f"Seu relatório {report_type} está pronto para download", |
|
|
level=NotificationLevel.INFO, |
|
|
metadata={ |
|
|
"report_id": report_id, |
|
|
"report_type": report_type, |
|
|
"download_url": download_url |
|
|
} |
|
|
) |
|
|
|
|
|
async def _get_user_channels( |
|
|
self, |
|
|
user_id: str, |
|
|
type: NotificationType, |
|
|
level: NotificationLevel |
|
|
) -> List[str]: |
|
|
"""Get notification channels based on user preferences.""" |
|
|
|
|
|
prefs = self._preferences.get(user_id, {}) |
|
|
|
|
|
|
|
|
if level == NotificationLevel.CRITICAL: |
|
|
return ["email", "webhook", "push"] |
|
|
elif level == NotificationLevel.ERROR: |
|
|
return ["email", "webhook"] |
|
|
elif level == NotificationLevel.WARNING: |
|
|
return ["email"] |
|
|
else: |
|
|
return ["email"] |
|
|
|
|
|
async def _send_email(self, user_id: str, notification: Notification) -> bool: |
|
|
"""Send notification via email.""" |
|
|
if not ADVANCED_FEATURES_AVAILABLE or not email_service or not EmailMessage: |
|
|
return False |
|
|
|
|
|
try: |
|
|
|
|
|
user_email = f"{user_id}@example.com" |
|
|
|
|
|
|
|
|
template_map = { |
|
|
NotificationType.INVESTIGATION_COMPLETE: "investigation_complete", |
|
|
NotificationType.ANOMALY_DETECTED: "anomaly_alert", |
|
|
NotificationType.REPORT_READY: "notification", |
|
|
NotificationType.EXPORT_COMPLETE: "notification", |
|
|
NotificationType.AGENT_ERROR: "notification", |
|
|
NotificationType.SYSTEM_ALERT: "notification", |
|
|
} |
|
|
|
|
|
template = template_map.get(notification.type, "notification") |
|
|
|
|
|
|
|
|
template_data = { |
|
|
"title": notification.title, |
|
|
"message": notification.message, |
|
|
"severity": notification.level.value, |
|
|
**(notification.metadata or {}) |
|
|
} |
|
|
|
|
|
|
|
|
email = EmailMessage( |
|
|
to=[user_email], |
|
|
subject=notification.title, |
|
|
template=template, |
|
|
template_data=template_data |
|
|
) |
|
|
|
|
|
return await email_service.send_email(email) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"Failed to send email notification", |
|
|
user_id=user_id, |
|
|
notification_id=notification.id, |
|
|
error=str(e) |
|
|
) |
|
|
return False |
|
|
|
|
|
async def _send_webhook(self, notification: Notification) -> bool: |
|
|
"""Send notification via webhook.""" |
|
|
if not ADVANCED_FEATURES_AVAILABLE or not webhook_service or not WebhookEvent: |
|
|
return False |
|
|
|
|
|
try: |
|
|
|
|
|
event_map = { |
|
|
NotificationType.INVESTIGATION_COMPLETE: WebhookEvent.INVESTIGATION_COMPLETED, |
|
|
NotificationType.ANOMALY_DETECTED: WebhookEvent.ANOMALY_DETECTED, |
|
|
NotificationType.REPORT_READY: WebhookEvent.REPORT_GENERATED, |
|
|
NotificationType.EXPORT_COMPLETE: WebhookEvent.EXPORT_COMPLETED, |
|
|
NotificationType.AGENT_ERROR: WebhookEvent.AGENT_FAILED, |
|
|
NotificationType.SYSTEM_ALERT: WebhookEvent.SYSTEM_ALERT, |
|
|
} |
|
|
|
|
|
event = event_map.get(notification.type, WebhookEvent.SYSTEM_ALERT) |
|
|
|
|
|
|
|
|
deliveries = await webhook_service.send_event( |
|
|
event=event, |
|
|
data={ |
|
|
"id": notification.id, |
|
|
"title": notification.title, |
|
|
"message": notification.message, |
|
|
"level": notification.level.value, |
|
|
"timestamp": notification.timestamp.isoformat(), |
|
|
}, |
|
|
metadata=notification.metadata |
|
|
) |
|
|
|
|
|
|
|
|
return any(d.success for d in deliveries) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"Failed to send webhook notification", |
|
|
notification_id=notification.id, |
|
|
error=str(e) |
|
|
) |
|
|
return False |
|
|
|
|
|
def get_notifications( |
|
|
self, |
|
|
user_id: Optional[str] = None, |
|
|
unread_only: bool = False, |
|
|
type: Optional[NotificationType] = None, |
|
|
level: Optional[NotificationLevel] = None, |
|
|
limit: int = 100 |
|
|
) -> List[Notification]: |
|
|
"""Get notifications with filtering.""" |
|
|
notifications = self._notifications.copy() |
|
|
|
|
|
|
|
|
if unread_only: |
|
|
notifications = [n for n in notifications if not n.read] |
|
|
|
|
|
if type: |
|
|
notifications = [n for n in notifications if n.type == type] |
|
|
|
|
|
if level: |
|
|
notifications = [n for n in notifications if n.level == level] |
|
|
|
|
|
|
|
|
notifications.sort(key=lambda n: n.timestamp, reverse=True) |
|
|
|
|
|
return notifications[:limit] |
|
|
|
|
|
def mark_as_read(self, notification_id: str) -> bool: |
|
|
"""Mark notification as read.""" |
|
|
for notification in self._notifications: |
|
|
if notification.id == notification_id: |
|
|
notification.read = True |
|
|
return True |
|
|
return False |
|
|
|
|
|
def mark_all_as_read(self, user_id: Optional[str] = None) -> int: |
|
|
"""Mark all notifications as read for a user.""" |
|
|
count = 0 |
|
|
for notification in self._notifications: |
|
|
if not notification.read: |
|
|
notification.read = True |
|
|
count += 1 |
|
|
return count |
|
|
|
|
|
def delete_notification(self, notification_id: str) -> bool: |
|
|
"""Delete a notification.""" |
|
|
initial_count = len(self._notifications) |
|
|
self._notifications = [n for n in self._notifications if n.id != notification_id] |
|
|
return len(self._notifications) < initial_count |
|
|
|
|
|
def _trim_notifications(self): |
|
|
"""Trim notifications list to max size.""" |
|
|
if len(self._notifications) > self._max_notifications: |
|
|
|
|
|
self._notifications = sorted( |
|
|
self._notifications, |
|
|
key=lambda n: n.timestamp, |
|
|
reverse=True |
|
|
)[:self._max_notifications] |
|
|
|
|
|
async def set_user_preferences( |
|
|
self, |
|
|
user_id: str, |
|
|
preferences: Dict[str, Any] |
|
|
) -> None: |
|
|
"""Set user notification preferences.""" |
|
|
self._preferences[user_id] = preferences |
|
|
|
|
|
def get_user_preferences(self, user_id: str) -> Dict[str, Any]: |
|
|
"""Get user notification preferences.""" |
|
|
return self._preferences.get(user_id, {}) |
|
|
|
|
|
|
|
|
|
|
|
notification_service = NotificationService() |