|
|
""" |
|
|
Module: api.middleware.webhook_verification |
|
|
Description: Webhook signature verification middleware |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-25 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import hmac |
|
|
import hashlib |
|
|
import time |
|
|
from typing import Optional, Dict, Any |
|
|
|
|
|
from fastapi import Request, HTTPException, status |
|
|
from starlette.middleware.base import BaseHTTPMiddleware |
|
|
from starlette.responses import JSONResponse |
|
|
|
|
|
from src.core import get_logger |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class WebhookVerificationMiddleware(BaseHTTPMiddleware): |
|
|
""" |
|
|
Middleware for verifying incoming webhook signatures. |
|
|
|
|
|
Protects endpoints that receive webhooks from external services. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
app, |
|
|
webhook_paths: Optional[Dict[str, str]] = None, |
|
|
max_timestamp_age: int = 300 |
|
|
): |
|
|
""" |
|
|
Initialize webhook verification middleware. |
|
|
|
|
|
Args: |
|
|
app: FastAPI application |
|
|
webhook_paths: Dict of path -> secret mapping |
|
|
max_timestamp_age: Maximum age of timestamp in seconds |
|
|
""" |
|
|
super().__init__(app) |
|
|
self.webhook_paths = webhook_paths or {} |
|
|
self.max_timestamp_age = max_timestamp_age |
|
|
|
|
|
async def dispatch(self, request: Request, call_next): |
|
|
"""Process request with webhook verification.""" |
|
|
|
|
|
if request.url.path not in self.webhook_paths: |
|
|
return await call_next(request) |
|
|
|
|
|
|
|
|
secret = self.webhook_paths[request.url.path] |
|
|
|
|
|
try: |
|
|
|
|
|
body = await request.body() |
|
|
|
|
|
|
|
|
if not self._verify_signature(request, body, secret): |
|
|
logger.warning( |
|
|
"webhook_signature_verification_failed", |
|
|
path=request.url.path, |
|
|
headers=dict(request.headers) |
|
|
) |
|
|
|
|
|
return JSONResponse( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
content={ |
|
|
"detail": "Invalid webhook signature", |
|
|
"error": "INVALID_SIGNATURE" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if not self._verify_timestamp(request): |
|
|
logger.warning( |
|
|
"webhook_timestamp_verification_failed", |
|
|
path=request.url.path |
|
|
) |
|
|
|
|
|
return JSONResponse( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
content={ |
|
|
"detail": "Webhook timestamp too old", |
|
|
"error": "TIMESTAMP_TOO_OLD" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
request.state.webhook_body = body |
|
|
|
|
|
|
|
|
return await call_next(request) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"webhook_verification_error", |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
|
|
|
return JSONResponse( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
content={ |
|
|
"detail": "Webhook verification error", |
|
|
"error": "VERIFICATION_ERROR" |
|
|
} |
|
|
) |
|
|
|
|
|
def _verify_signature( |
|
|
self, |
|
|
request: Request, |
|
|
body: bytes, |
|
|
secret: str |
|
|
) -> bool: |
|
|
"""Verify webhook signature.""" |
|
|
|
|
|
signature_header = ( |
|
|
request.headers.get("X-Cidadao-Signature") or |
|
|
request.headers.get("X-Webhook-Signature") or |
|
|
request.headers.get("X-Hub-Signature-256") |
|
|
) |
|
|
|
|
|
if not signature_header: |
|
|
logger.debug("No signature header found") |
|
|
return False |
|
|
|
|
|
|
|
|
if "=" in signature_header: |
|
|
algorithm, signature = signature_header.split("=", 1) |
|
|
else: |
|
|
algorithm = "sha256" |
|
|
signature = signature_header |
|
|
|
|
|
|
|
|
if algorithm == "sha256": |
|
|
expected = hmac.new( |
|
|
secret.encode(), |
|
|
body, |
|
|
hashlib.sha256 |
|
|
).hexdigest() |
|
|
elif algorithm == "sha1": |
|
|
expected = hmac.new( |
|
|
secret.encode(), |
|
|
body, |
|
|
hashlib.sha1 |
|
|
).hexdigest() |
|
|
else: |
|
|
logger.warning(f"Unsupported signature algorithm: {algorithm}") |
|
|
return False |
|
|
|
|
|
|
|
|
return hmac.compare_digest(signature, expected) |
|
|
|
|
|
def _verify_timestamp(self, request: Request) -> bool: |
|
|
"""Verify webhook timestamp is recent.""" |
|
|
timestamp_header = ( |
|
|
request.headers.get("X-Cidadao-Timestamp") or |
|
|
request.headers.get("X-Webhook-Timestamp") |
|
|
) |
|
|
|
|
|
if not timestamp_header: |
|
|
|
|
|
return True |
|
|
|
|
|
try: |
|
|
|
|
|
if timestamp_header.isdigit(): |
|
|
|
|
|
webhook_time = float(timestamp_header) |
|
|
else: |
|
|
|
|
|
from dateutil.parser import parse |
|
|
webhook_time = parse(timestamp_header).timestamp() |
|
|
|
|
|
|
|
|
current_time = time.time() |
|
|
age = abs(current_time - webhook_time) |
|
|
|
|
|
return age <= self.max_timestamp_age |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to parse timestamp: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def create_webhook_signature( |
|
|
payload: bytes, |
|
|
secret: str, |
|
|
algorithm: str = "sha256" |
|
|
) -> str: |
|
|
""" |
|
|
Create webhook signature for outgoing webhooks. |
|
|
|
|
|
Args: |
|
|
payload: Request body |
|
|
secret: Webhook secret |
|
|
algorithm: Hash algorithm (sha256, sha1) |
|
|
|
|
|
Returns: |
|
|
Signature string with format "algorithm=signature" |
|
|
""" |
|
|
if algorithm == "sha256": |
|
|
signature = hmac.new( |
|
|
secret.encode(), |
|
|
payload, |
|
|
hashlib.sha256 |
|
|
).hexdigest() |
|
|
elif algorithm == "sha1": |
|
|
signature = hmac.new( |
|
|
secret.encode(), |
|
|
payload, |
|
|
hashlib.sha1 |
|
|
).hexdigest() |
|
|
else: |
|
|
raise ValueError(f"Unsupported algorithm: {algorithm}") |
|
|
|
|
|
return f"{algorithm}={signature}" |
|
|
|
|
|
|
|
|
def verify_webhook_signature( |
|
|
signature: str, |
|
|
payload: bytes, |
|
|
secret: str |
|
|
) -> bool: |
|
|
""" |
|
|
Verify webhook signature. |
|
|
|
|
|
Args: |
|
|
signature: Signature header value |
|
|
payload: Request body |
|
|
secret: Webhook secret |
|
|
|
|
|
Returns: |
|
|
True if signature is valid |
|
|
""" |
|
|
try: |
|
|
|
|
|
if "=" in signature: |
|
|
algorithm, sig = signature.split("=", 1) |
|
|
else: |
|
|
algorithm = "sha256" |
|
|
sig = signature |
|
|
|
|
|
|
|
|
expected = create_webhook_signature(payload, secret, algorithm) |
|
|
|
|
|
|
|
|
if "=" in expected: |
|
|
_, expected_sig = expected.split("=", 1) |
|
|
else: |
|
|
expected_sig = expected |
|
|
|
|
|
|
|
|
return hmac.compare_digest(sig, expected_sig) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Signature verification error: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
class WebhookSigner: |
|
|
"""Helper class for signing webhook requests.""" |
|
|
|
|
|
def __init__(self, secret: str, algorithm: str = "sha256"): |
|
|
"""Initialize webhook signer.""" |
|
|
self.secret = secret |
|
|
self.algorithm = algorithm |
|
|
|
|
|
def sign(self, payload: bytes) -> Dict[str, str]: |
|
|
""" |
|
|
Generate webhook headers with signature. |
|
|
|
|
|
Args: |
|
|
payload: Request body |
|
|
|
|
|
Returns: |
|
|
Dict of headers to include in request |
|
|
""" |
|
|
signature = create_webhook_signature( |
|
|
payload, |
|
|
self.secret, |
|
|
self.algorithm |
|
|
) |
|
|
|
|
|
timestamp = str(int(time.time())) |
|
|
|
|
|
return { |
|
|
"X-Cidadao-Signature": signature, |
|
|
"X-Cidadao-Timestamp": timestamp, |
|
|
"X-Cidadao-Algorithm": self.algorithm |
|
|
} |