anderson-ufrj
feat(security): implement IP whitelist for production environments
f70869e
"""
Module: api.routes.webhooks
Description: Webhook endpoints for receiving external events
Author: Anderson H. Silva
Date: 2025-01-25
License: Proprietary - All rights reserved
"""
from typing import Dict, Any, Optional
from datetime import datetime
from fastapi import APIRouter, Request, Depends, HTTPException, status, BackgroundTasks
from pydantic import BaseModel, Field
from src.core import get_logger
from src.api.dependencies import get_current_user
from src.services.webhook_service import WebhookConfig, WebhookEvent, webhook_service
from src.api.middleware.webhook_verification import verify_webhook_signature
logger = get_logger(__name__)
router = APIRouter(prefix="/webhooks", tags=["Webhooks"])
class IncomingWebhookPayload(BaseModel):
"""Generic incoming webhook payload."""
event: str
timestamp: Optional[datetime] = None
data: Dict[str, Any]
signature: Optional[str] = None
class WebhookRegistrationRequest(BaseModel):
"""Request to register a new webhook."""
url: str = Field(..., description="Webhook endpoint URL")
events: Optional[list[str]] = Field(None, description="Events to subscribe to (None = all)")
secret: Optional[str] = Field(None, description="Webhook secret for HMAC signing")
headers: Optional[Dict[str, str]] = Field(None, description="Custom headers")
active: bool = Field(True, description="Whether webhook is active")
class WebhookTestRequest(BaseModel):
"""Request to test a webhook."""
url: str = Field(..., description="Webhook URL to test")
secret: Optional[str] = Field(None, description="Webhook secret if any")
@router.post("/incoming/github")
async def receive_github_webhook(
request: Request,
background_tasks: BackgroundTasks
):
"""
Receive webhooks from GitHub.
Requires webhook signature verification.
"""
# Get raw body from request state (set by verification middleware)
body = getattr(request.state, "webhook_body", None)
if not body:
body = await request.body()
# Parse event type
event_type = request.headers.get("X-GitHub-Event", "unknown")
# Parse payload
try:
import json
payload = json.loads(body)
except Exception as e:
logger.error("Failed to parse GitHub webhook", error=str(e))
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid payload format"
)
# Log webhook received
logger.info(
"github_webhook_received",
event=event_type,
repository=payload.get("repository", {}).get("full_name"),
action=payload.get("action")
)
# Process webhook asynchronously
background_tasks.add_task(
process_github_webhook,
event_type,
payload
)
return {"status": "accepted", "event": event_type}
@router.post("/incoming/generic/{webhook_id}")
async def receive_generic_webhook(
webhook_id: str,
request: Request,
payload: IncomingWebhookPayload,
background_tasks: BackgroundTasks
):
"""
Receive generic webhooks with configurable verification.
The webhook_id should match a configured incoming webhook.
"""
# Verify webhook ID exists and get configuration
# In production, this would look up from database
webhook_config = get_incoming_webhook_config(webhook_id)
if not webhook_config:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Webhook configuration not found: {webhook_id}"
)
# Verify signature if secret is configured
if webhook_config.get("secret"):
body = await request.body()
signature = request.headers.get("X-Webhook-Signature")
if not signature:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing webhook signature"
)
if not verify_webhook_signature(signature, body, webhook_config["secret"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid webhook signature"
)
# Log webhook
logger.info(
"generic_webhook_received",
webhook_id=webhook_id,
event=payload.event,
timestamp=payload.timestamp
)
# Process asynchronously
background_tasks.add_task(
process_generic_webhook,
webhook_id,
payload
)
return {
"status": "accepted",
"webhook_id": webhook_id,
"event": payload.event
}
@router.post("/register")
async def register_webhook(
request: WebhookRegistrationRequest,
current_user=Depends(get_current_user)
):
"""
Register a new outgoing webhook.
Requires authentication.
"""
# Convert string events to enum
events = None
if request.events:
try:
events = [WebhookEvent(e) for e in request.events]
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid event type: {e}"
)
# Create webhook config
config = WebhookConfig(
url=request.url,
events=events,
secret=request.secret,
headers=request.headers,
active=request.active
)
# Register webhook
webhook_service.add_webhook(config)
logger.info(
"webhook_registered",
user=current_user.get("email"),
url=request.url,
events=request.events
)
return {
"status": "registered",
"url": request.url,
"events": request.events,
"active": request.active
}
@router.delete("/unregister")
async def unregister_webhook(
url: str,
current_user=Depends(get_current_user)
):
"""
Unregister a webhook.
Requires authentication.
"""
removed = webhook_service.remove_webhook(url)
if not removed:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Webhook not found: {url}"
)
logger.info(
"webhook_unregistered",
user=current_user.get("email"),
url=url
)
return {"status": "unregistered", "url": url}
@router.get("/list")
async def list_webhooks(
current_user=Depends(get_current_user)
):
"""
List all registered outgoing webhooks.
Requires authentication.
"""
webhooks = webhook_service.list_webhooks()
return {
"webhooks": [
{
"url": str(w.url),
"events": [e.value for e in w.events] if w.events else None,
"active": w.active,
"has_secret": bool(w.secret)
}
for w in webhooks
],
"total": len(webhooks)
}
@router.post("/test")
async def test_webhook(
request: WebhookTestRequest,
background_tasks: BackgroundTasks,
current_user=Depends(get_current_user)
):
"""
Test a webhook endpoint.
Sends a test payload to verify webhook is working.
"""
# Create temporary webhook config
config = WebhookConfig(
url=request.url,
secret=request.secret,
max_retries=1,
timeout=10
)
# Test webhook
delivery = await webhook_service.test_webhook(config)
logger.info(
"webhook_tested",
user=current_user.get("email"),
url=request.url,
success=delivery.success,
status_code=delivery.status_code
)
return {
"url": request.url,
"success": delivery.success,
"status_code": delivery.status_code,
"response": delivery.response_body,
"error": delivery.error,
"duration_ms": delivery.duration_ms
}
@router.get("/history")
async def get_webhook_history(
event: Optional[str] = None,
url: Optional[str] = None,
success: Optional[bool] = None,
limit: int = 100,
current_user=Depends(get_current_user)
):
"""
Get webhook delivery history.
Requires authentication.
"""
# Convert event string to enum if provided
event_enum = None
if event:
try:
event_enum = WebhookEvent(event)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid event type: {event}"
)
history = webhook_service.get_delivery_history(
event=event_enum,
url=url,
success=success,
limit=limit
)
return {
"deliveries": [
{
"webhook_url": d.webhook_url,
"event": d.event.value,
"timestamp": d.timestamp.isoformat(),
"success": d.success,
"status_code": d.status_code,
"error": d.error,
"attempts": d.attempts,
"duration_ms": d.duration_ms
}
for d in history
],
"total": len(history)
}
# Helper functions
def get_incoming_webhook_config(webhook_id: str) -> Optional[Dict[str, Any]]:
"""Get configuration for incoming webhook."""
# In production, this would be from database
# For now, return mock config
configs = {
"test": {
"secret": "test-secret",
"active": True
},
"monitoring": {
"secret": "monitoring-secret",
"active": True
}
}
return configs.get(webhook_id)
async def process_github_webhook(event_type: str, payload: Dict[str, Any]):
"""Process GitHub webhook asynchronously."""
try:
# Handle different GitHub events
if event_type == "push":
# Handle code push
logger.info("Processing GitHub push event")
elif event_type == "pull_request":
# Handle pull request
logger.info("Processing GitHub pull request event")
elif event_type == "issues":
# Handle issues
logger.info("Processing GitHub issues event")
# Add more event handlers as needed
except Exception as e:
logger.error(
"Failed to process GitHub webhook",
event=event_type,
error=str(e),
exc_info=True
)
async def process_generic_webhook(webhook_id: str, payload: IncomingWebhookPayload):
"""Process generic webhook asynchronously."""
try:
# Route to appropriate handler based on webhook_id
logger.info(
"Processing generic webhook",
webhook_id=webhook_id,
event=payload.event
)
# Add specific processing logic here
except Exception as e:
logger.error(
"Failed to process generic webhook",
webhook_id=webhook_id,
error=str(e),
exc_info=True
)