|
|
""" |
|
|
Module: utils.cors_validator |
|
|
Description: CORS configuration validator and tester |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-25 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import httpx |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.core.config import settings |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class CORSValidator: |
|
|
"""Validate and test CORS configuration.""" |
|
|
|
|
|
def __init__(self, base_url: str = "http://localhost:8000"): |
|
|
"""Initialize CORS validator.""" |
|
|
self.base_url = base_url |
|
|
self.test_endpoints = [ |
|
|
"/", |
|
|
"/health", |
|
|
"/api/v1/chat/message", |
|
|
"/api/v1/investigations/analyze", |
|
|
"/api/v1/auth/login" |
|
|
] |
|
|
|
|
|
async def validate_origin( |
|
|
self, |
|
|
origin: str, |
|
|
endpoint: str = "/health", |
|
|
method: str = "GET" |
|
|
) -> Tuple[bool, Dict[str, str]]: |
|
|
""" |
|
|
Validate if origin is allowed by CORS policy. |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_allowed, cors_headers) |
|
|
""" |
|
|
headers = { |
|
|
"Origin": origin, |
|
|
"User-Agent": "CORS-Validator/1.0" |
|
|
} |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
try: |
|
|
|
|
|
preflight_response = await client.options( |
|
|
f"{self.base_url}{endpoint}", |
|
|
headers={ |
|
|
**headers, |
|
|
"Access-Control-Request-Method": method, |
|
|
"Access-Control-Request-Headers": "Content-Type, Authorization" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
cors_headers = {} |
|
|
for header in preflight_response.headers: |
|
|
if header.lower().startswith("access-control-"): |
|
|
cors_headers[header] = preflight_response.headers[header] |
|
|
|
|
|
|
|
|
allowed_origin = cors_headers.get("Access-Control-Allow-Origin", "") |
|
|
is_allowed = allowed_origin == origin or allowed_origin == "*" |
|
|
|
|
|
logger.info( |
|
|
"cors_validation_result", |
|
|
origin=origin, |
|
|
endpoint=endpoint, |
|
|
is_allowed=is_allowed, |
|
|
cors_headers=cors_headers |
|
|
) |
|
|
|
|
|
return is_allowed, cors_headers |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"cors_validation_error", |
|
|
origin=origin, |
|
|
endpoint=endpoint, |
|
|
error=str(e) |
|
|
) |
|
|
return False, {} |
|
|
|
|
|
async def test_all_origins(self) -> Dict[str, Dict[str, any]]: |
|
|
"""Test all configured origins.""" |
|
|
results = {} |
|
|
|
|
|
|
|
|
for origin in settings.cors_origins: |
|
|
if origin == "*" or origin.startswith("https://*."): |
|
|
|
|
|
continue |
|
|
|
|
|
is_allowed, headers = await self.validate_origin(origin) |
|
|
results[origin] = { |
|
|
"allowed": is_allowed, |
|
|
"headers": headers |
|
|
} |
|
|
|
|
|
|
|
|
vercel_test_origins = [ |
|
|
"https://cidadao-ai-frontend-abc123-neural-thinker.vercel.app", |
|
|
"https://cidadao-ai-preview-xyz789-neural-thinker.vercel.app" |
|
|
] |
|
|
|
|
|
for origin in vercel_test_origins: |
|
|
is_allowed, headers = await self.validate_origin(origin) |
|
|
results[origin] = { |
|
|
"allowed": is_allowed, |
|
|
"headers": headers, |
|
|
"note": "Vercel preview URL test" |
|
|
} |
|
|
|
|
|
return results |
|
|
|
|
|
def generate_nginx_config(self) -> str: |
|
|
"""Generate nginx CORS configuration.""" |
|
|
config = """# CORS configuration for Cidadão.AI |
|
|
# Add this to your nginx server block |
|
|
|
|
|
# Handle preflight requests |
|
|
if ($request_method = 'OPTIONS') { |
|
|
add_header 'Access-Control-Allow-Origin' '$http_origin' always; |
|
|
add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always; |
|
|
add_header 'Access-Control-Allow-Headers' 'Accept,Authorization,Cache-Control,Content-Type,DNT,If-Modified-Since,Keep-Alive,Origin,User-Agent,X-Requested-With,X-API-Key' always; |
|
|
add_header 'Access-Control-Allow-Credentials' 'true' always; |
|
|
add_header 'Access-Control-Max-Age' 86400 always; |
|
|
add_header 'Content-Length' 0; |
|
|
add_header 'Content-Type' 'text/plain charset=UTF-8'; |
|
|
return 204; |
|
|
} |
|
|
|
|
|
# Add CORS headers to responses |
|
|
add_header 'Access-Control-Allow-Origin' '$http_origin' always; |
|
|
add_header 'Access-Control-Allow-Credentials' 'true' always; |
|
|
add_header 'Access-Control-Expose-Headers' 'X-RateLimit-Limit,X-RateLimit-Remaining,X-RateLimit-Reset,X-Request-ID,X-Total-Count' always; |
|
|
""" |
|
|
return config |
|
|
|
|
|
def generate_cloudflare_headers(self) -> List[Dict[str, str]]: |
|
|
"""Generate Cloudflare transform rules for CORS.""" |
|
|
rules = [] |
|
|
|
|
|
|
|
|
rules.append({ |
|
|
"name": "CORS - Vercel Origins", |
|
|
"expression": 'http.request.headers["origin"][0] matches "^https://[a-zA-Z0-9-]+\\.vercel\\.app$"', |
|
|
"headers": { |
|
|
"Access-Control-Allow-Origin": '${http.request.headers["origin"][0]}', |
|
|
"Access-Control-Allow-Credentials": "true", |
|
|
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, PATCH, OPTIONS" |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
rules.append({ |
|
|
"name": "CORS - Localhost Development", |
|
|
"expression": 'http.request.headers["origin"][0] in {"http://localhost:3000", "http://127.0.0.1:3000"}', |
|
|
"headers": { |
|
|
"Access-Control-Allow-Origin": '${http.request.headers["origin"][0]}', |
|
|
"Access-Control-Allow-Credentials": "true", |
|
|
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, PATCH, OPTIONS" |
|
|
} |
|
|
}) |
|
|
|
|
|
return rules |
|
|
|
|
|
async def test_credentials_flow( |
|
|
self, |
|
|
origin: str = "https://cidadao-ai-frontend.vercel.app" |
|
|
) -> Dict[str, any]: |
|
|
"""Test CORS with credentials (cookies/auth).""" |
|
|
results = { |
|
|
"origin": origin, |
|
|
"tests": {} |
|
|
} |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
|
|
|
try: |
|
|
response = await client.post( |
|
|
f"{self.base_url}/api/v1/auth/login", |
|
|
headers={"Origin": origin}, |
|
|
json={"email": "[email protected]", "password": "test"}, |
|
|
follow_redirects=False |
|
|
) |
|
|
|
|
|
results["tests"]["login"] = { |
|
|
"status": response.status_code, |
|
|
"cors_origin": response.headers.get("Access-Control-Allow-Origin"), |
|
|
"cors_credentials": response.headers.get("Access-Control-Allow-Credentials"), |
|
|
"has_cookies": "Set-Cookie" in response.headers |
|
|
} |
|
|
except Exception as e: |
|
|
results["tests"]["login"] = {"error": str(e)} |
|
|
|
|
|
|
|
|
try: |
|
|
response = await client.get( |
|
|
f"{self.base_url}/api/v1/chat/history", |
|
|
headers={ |
|
|
"Origin": origin, |
|
|
"Authorization": "Bearer test-token" |
|
|
} |
|
|
) |
|
|
|
|
|
results["tests"]["authenticated"] = { |
|
|
"status": response.status_code, |
|
|
"cors_origin": response.headers.get("Access-Control-Allow-Origin"), |
|
|
"cors_credentials": response.headers.get("Access-Control-Allow-Credentials") |
|
|
} |
|
|
except Exception as e: |
|
|
results["tests"]["authenticated"] = {"error": str(e)} |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Run CORS validation tests.""" |
|
|
import asyncio |
|
|
import json |
|
|
|
|
|
validator = CORSValidator() |
|
|
|
|
|
print("🔍 Testing CORS configuration...\n") |
|
|
|
|
|
|
|
|
print("1. Testing configured origins:") |
|
|
results = await validator.test_all_origins() |
|
|
for origin, result in results.items(): |
|
|
status = "✅" if result["allowed"] else "❌" |
|
|
print(f" {status} {origin}") |
|
|
|
|
|
|
|
|
print("\n2. Testing credentials flow:") |
|
|
creds_results = await validator.test_credentials_flow() |
|
|
print(json.dumps(creds_results, indent=2)) |
|
|
|
|
|
|
|
|
print("\n3. Generated nginx configuration:") |
|
|
print(validator.generate_nginx_config()) |
|
|
|
|
|
print("\n4. Cloudflare transform rules:") |
|
|
cf_rules = validator.generate_cloudflare_headers() |
|
|
print(json.dumps(cf_rules, indent=2)) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import asyncio |
|
|
asyncio.run(main()) |