| """Webhook service for sending notifications to external endpoints. |
| |
| This service provides async webhook delivery with: |
| - Retry logic with exponential backoff |
| - Request signing for security |
| - Batch webhook sending |
| - Event filtering |
| - Delivery status tracking |
| """ |
|
|
| import asyncio |
| import hashlib |
| import hmac |
| from datetime import datetime, timezone |
| from typing import List, Dict, Any, Optional, Union |
| from enum import Enum |
| import httpx |
| from pydantic import BaseModel, HttpUrl, Field, validator |
| import structlog |
| from tenacity import ( |
| retry, |
| stop_after_attempt, |
| wait_exponential, |
| retry_if_exception_type |
| ) |
|
|
| from src.core.config import settings |
| from src.core.logging import get_logger |
| from src.core import json_utils |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class WebhookEvent(str, Enum): |
| """Webhook event types.""" |
| INVESTIGATION_CREATED = "investigation.created" |
| INVESTIGATION_COMPLETED = "investigation.completed" |
| INVESTIGATION_FAILED = "investigation.failed" |
| |
| ANOMALY_DETECTED = "anomaly.detected" |
| ANOMALY_RESOLVED = "anomaly.resolved" |
| |
| AGENT_STARTED = "agent.started" |
| AGENT_COMPLETED = "agent.completed" |
| AGENT_FAILED = "agent.failed" |
| |
| REPORT_GENERATED = "report.generated" |
| EXPORT_COMPLETED = "export.completed" |
| |
| USER_REGISTERED = "user.registered" |
| USER_LOGIN = "user.login" |
| |
| SYSTEM_ALERT = "system.alert" |
| SYSTEM_ERROR = "system.error" |
|
|
|
|
| class WebhookPayload(BaseModel): |
| """Webhook payload model.""" |
| event: WebhookEvent |
| timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
| data: Dict[str, Any] |
| metadata: Optional[Dict[str, Any]] = None |
| |
| @validator("timestamp", pre=True) |
| def ensure_timezone(cls, v): |
| """Ensure timestamp has timezone.""" |
| if isinstance(v, datetime) and v.tzinfo is None: |
| return v.replace(tzinfo=timezone.utc) |
| return v |
|
|
|
|
| class WebhookConfig(BaseModel): |
| """Webhook configuration model.""" |
| url: HttpUrl |
| secret: Optional[str] = None |
| events: Optional[List[WebhookEvent]] = None |
| headers: Optional[Dict[str, str]] = None |
| timeout: int = Field(default=30, ge=1, le=300) |
| max_retries: int = Field(default=3, ge=0, le=10) |
| active: bool = Field(default=True) |
| |
| def should_send_event(self, event: WebhookEvent) -> bool: |
| """Check if webhook should receive this event.""" |
| if not self.active: |
| return False |
| if self.events is None: |
| return True |
| return event in self.events |
|
|
|
|
| class WebhookDelivery(BaseModel): |
| """Webhook delivery result.""" |
| webhook_url: str |
| event: WebhookEvent |
| timestamp: datetime |
| status_code: Optional[int] = None |
| response_body: Optional[str] = None |
| error: Optional[str] = None |
| attempts: int = 0 |
| success: bool = False |
| duration_ms: Optional[float] = None |
|
|
|
|
| class WebhookService: |
| """Service for managing and sending webhooks.""" |
| |
| def __init__(self): |
| """Initialize webhook service.""" |
| self._webhooks: List[WebhookConfig] = [] |
| self._client = httpx.AsyncClient( |
| timeout=httpx.Timeout(30.0), |
| follow_redirects=False, |
| limits=httpx.Limits(max_connections=100, max_keepalive_connections=20) |
| ) |
| self._delivery_history: List[WebhookDelivery] = [] |
| self._max_history = 1000 |
| |
| def add_webhook(self, webhook: WebhookConfig) -> None: |
| """Add a webhook configuration.""" |
| self._webhooks.append(webhook) |
| logger.info( |
| "Webhook added", |
| url=str(webhook.url), |
| events=webhook.events, |
| active=webhook.active |
| ) |
| |
| def remove_webhook(self, url: str) -> bool: |
| """Remove a webhook by URL.""" |
| initial_count = len(self._webhooks) |
| self._webhooks = [w for w in self._webhooks if str(w.url) != url] |
| removed = len(self._webhooks) < initial_count |
| |
| if removed: |
| logger.info("Webhook removed", url=url) |
| |
| return removed |
| |
| def list_webhooks(self) -> List[WebhookConfig]: |
| """List all configured webhooks.""" |
| return self._webhooks.copy() |
| |
| def _generate_signature(self, payload: bytes, secret: str) -> str: |
| """Generate HMAC signature for webhook payload.""" |
| signature = hmac.new( |
| secret.encode(), |
| payload, |
| hashlib.sha256 |
| ).hexdigest() |
| return f"sha256={signature}" |
| |
| def _prepare_request( |
| self, |
| webhook: WebhookConfig, |
| payload: WebhookPayload |
| ) -> tuple[Dict[str, str], bytes]: |
| """Prepare webhook request headers and body.""" |
| |
| body_data = { |
| "event": payload.event, |
| "timestamp": payload.timestamp.isoformat(), |
| "data": payload.data, |
| } |
| if payload.metadata: |
| body_data["metadata"] = payload.metadata |
| |
| body = json_utils.dumps(body_data).encode() |
| |
| |
| headers = { |
| "Content-Type": "application/json", |
| "User-Agent": "Cidadao.AI/1.0", |
| "X-Cidadao-Event": payload.event, |
| "X-Cidadao-Timestamp": payload.timestamp.isoformat(), |
| } |
| |
| |
| if webhook.secret: |
| headers["X-Cidadao-Signature"] = self._generate_signature(body, webhook.secret) |
| |
| |
| if webhook.headers: |
| headers.update(webhook.headers) |
| |
| return headers, body |
| |
| @retry( |
| stop=stop_after_attempt(3), |
| wait=wait_exponential(multiplier=1, min=4, max=10), |
| retry=retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError)) |
| ) |
| async def _send_webhook( |
| self, |
| webhook: WebhookConfig, |
| payload: WebhookPayload |
| ) -> WebhookDelivery: |
| """Send a single webhook with retry logic.""" |
| delivery = WebhookDelivery( |
| webhook_url=str(webhook.url), |
| event=payload.event, |
| timestamp=datetime.now(timezone.utc) |
| ) |
| |
| try: |
| |
| headers, body = self._prepare_request(webhook, payload) |
| |
| |
| start_time = asyncio.get_event_loop().time() |
| response = await self._client.post( |
| str(webhook.url), |
| headers=headers, |
| content=body, |
| timeout=webhook.timeout |
| ) |
| end_time = asyncio.get_event_loop().time() |
| |
| |
| delivery.status_code = response.status_code |
| delivery.response_body = response.text[:1000] |
| delivery.success = 200 <= response.status_code < 300 |
| delivery.duration_ms = (end_time - start_time) * 1000 |
| delivery.attempts = 1 |
| |
| if not delivery.success: |
| logger.warning( |
| "Webhook delivery failed", |
| url=str(webhook.url), |
| status_code=response.status_code, |
| response=delivery.response_body |
| ) |
| |
| except Exception as e: |
| delivery.error = str(e) |
| delivery.success = False |
| logger.error( |
| "Webhook delivery error", |
| url=str(webhook.url), |
| error=str(e), |
| exc_info=True |
| ) |
| raise |
| |
| return delivery |
| |
| async def send_event( |
| self, |
| event: WebhookEvent, |
| data: Dict[str, Any], |
| metadata: Optional[Dict[str, Any]] = None |
| ) -> List[WebhookDelivery]: |
| """Send an event to all configured webhooks. |
| |
| Args: |
| event: Event type |
| data: Event data |
| metadata: Optional metadata |
| |
| Returns: |
| List of delivery results |
| """ |
| payload = WebhookPayload( |
| event=event, |
| data=data, |
| metadata=metadata |
| ) |
| |
| |
| webhooks_to_send = [ |
| webhook for webhook in self._webhooks |
| if webhook.should_send_event(event) |
| ] |
| |
| if not webhooks_to_send: |
| logger.debug("No webhooks configured for event", event=event) |
| return [] |
| |
| |
| tasks = [ |
| self._send_webhook(webhook, payload) |
| for webhook in webhooks_to_send |
| ] |
| |
| deliveries = await asyncio.gather(*tasks, return_exceptions=True) |
| |
| |
| results = [] |
| for delivery in deliveries: |
| if isinstance(delivery, Exception): |
| |
| delivery = WebhookDelivery( |
| webhook_url="unknown", |
| event=event, |
| timestamp=datetime.now(timezone.utc), |
| error=str(delivery), |
| success=False |
| ) |
| results.append(delivery) |
| |
| |
| self._store_deliveries(results) |
| |
| |
| successful = sum(1 for d in results if d.success) |
| logger.info( |
| "Webhooks sent", |
| event=event, |
| total=len(results), |
| successful=successful, |
| failed=len(results) - successful |
| ) |
| |
| return results |
| |
| def _store_deliveries(self, deliveries: List[WebhookDelivery]) -> None: |
| """Store delivery results in history.""" |
| self._delivery_history.extend(deliveries) |
| |
| |
| if len(self._delivery_history) > self._max_history: |
| self._delivery_history = self._delivery_history[-self._max_history:] |
| |
| def get_delivery_history( |
| self, |
| event: Optional[WebhookEvent] = None, |
| url: Optional[str] = None, |
| success: Optional[bool] = None, |
| limit: int = 100 |
| ) -> List[WebhookDelivery]: |
| """Get webhook delivery history with filtering.""" |
| history = self._delivery_history.copy() |
| |
| |
| if event: |
| history = [d for d in history if d.event == event] |
| if url: |
| history = [d for d in history if d.webhook_url == url] |
| if success is not None: |
| history = [d for d in history if d.success == success] |
| |
| |
| history.sort(key=lambda d: d.timestamp, reverse=True) |
| return history[:limit] |
| |
| async def test_webhook(self, webhook: WebhookConfig) -> WebhookDelivery: |
| """Test a webhook configuration.""" |
| test_payload = WebhookPayload( |
| event=WebhookEvent.SYSTEM_ALERT, |
| data={ |
| "type": "test", |
| "message": "This is a test webhook from Cidadão.AI", |
| "timestamp": datetime.now(timezone.utc).isoformat() |
| }, |
| metadata={"test": True} |
| ) |
| |
| return await self._send_webhook(webhook, test_payload) |
| |
| async def close(self): |
| """Close the webhook service.""" |
| await self._client.aclose() |
|
|
|
|
| |
| webhook_service = WebhookService() |
|
|
|
|
| |
| async def send_webhook_event( |
| event: WebhookEvent, |
| data: Dict[str, Any], |
| metadata: Optional[Dict[str, Any]] = None |
| ) -> List[WebhookDelivery]: |
| """Send a webhook event using the default service.""" |
| return await webhook_service.send_event(event, data, metadata) |
|
|
|
|
| async def register_webhook( |
| url: str, |
| events: Optional[List[WebhookEvent]] = None, |
| secret: Optional[str] = None, |
| **kwargs |
| ) -> None: |
| """Register a new webhook.""" |
| config = WebhookConfig( |
| url=url, |
| events=events, |
| secret=secret, |
| **kwargs |
| ) |
| webhook_service.add_webhook(config) |