| """ |
| Module: api.middleware.authentication |
| Description: Authentication middleware for API endpoints |
| Author: Anderson H. Silva |
| Date: 2025-01-24 |
| License: Proprietary - All rights reserved |
| """ |
|
|
| from typing import Optional |
| from datetime import datetime, timedelta |
|
|
| from fastapi import Request, HTTPException, Depends |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| import jwt |
|
|
| from src.core import get_logger, settings |
|
|
|
|
| class AuthenticationMiddleware: |
| """Authentication middleware for API endpoints.""" |
| |
| def __init__(self): |
| """Initialize authentication middleware.""" |
| self.logger = get_logger(__name__) |
| self.security = HTTPBearer(auto_error=False) |
| |
| async def __call__(self, request: Request): |
| """Authenticate request.""" |
| |
| if request.url.path in ["/health", "/health/", "/docs", "/openapi.json", "/"]: |
| return True |
| |
| |
| api_key = request.headers.get("X-API-Key") |
| if api_key: |
| return await self._validate_api_key(api_key, request) |
| |
| |
| auth_header = request.headers.get("Authorization") |
| if auth_header and auth_header.startswith("Bearer "): |
| token = auth_header[7:] |
| return await self._validate_jwt_token(token, request) |
| |
| |
| if settings.app_env == "development": |
| self.logger.warning( |
| "unauthenticated_request_allowed", |
| path=request.url.path, |
| method=request.method, |
| environment="development" |
| ) |
| return True |
| |
| |
| self.logger.warning( |
| "unauthenticated_request_rejected", |
| path=request.url.path, |
| method=request.method, |
| ) |
| |
| raise HTTPException( |
| status_code=401, |
| detail="Authentication required", |
| headers={"WWW-Authenticate": "Bearer"} |
| ) |
| |
| async def _validate_api_key(self, api_key: str, request: Request) -> bool: |
| """Validate API key.""" |
| |
| |
| |
| if not api_key or len(api_key) < 32: |
| self.logger.warning( |
| "invalid_api_key_format", |
| api_key_length=len(api_key) if api_key else 0, |
| path=request.url.path, |
| ) |
| raise HTTPException( |
| status_code=401, |
| detail="Invalid API key format" |
| ) |
| |
| |
| |
| self.logger.info( |
| "api_key_authentication_success", |
| path=request.url.path, |
| method=request.method, |
| ) |
| |
| return True |
| |
| async def _validate_jwt_token(self, token: str, request: Request) -> bool: |
| """Validate JWT token.""" |
| try: |
| |
| payload = jwt.decode( |
| token, |
| settings.jwt_secret_key.get_secret_value(), |
| algorithms=[settings.jwt_algorithm] |
| ) |
| |
| |
| exp = payload.get("exp") |
| if exp and datetime.utcnow().timestamp() > exp: |
| raise HTTPException( |
| status_code=401, |
| detail="Token has expired" |
| ) |
| |
| |
| request.state.user_id = payload.get("sub") |
| request.state.user_email = payload.get("email") |
| request.state.user_roles = payload.get("roles", []) |
| |
| self.logger.info( |
| "jwt_authentication_success", |
| user_id=request.state.user_id, |
| path=request.url.path, |
| method=request.method, |
| ) |
| |
| return True |
| |
| except jwt.ExpiredSignatureError: |
| self.logger.warning( |
| "jwt_token_expired", |
| path=request.url.path, |
| ) |
| raise HTTPException( |
| status_code=401, |
| detail="Token has expired" |
| ) |
| except jwt.JWTError as e: |
| self.logger.warning( |
| "jwt_validation_failed", |
| error=str(e), |
| path=request.url.path, |
| ) |
| raise HTTPException( |
| status_code=401, |
| detail="Invalid token" |
| ) |
|
|
|
|
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: |
| """Create JWT access token.""" |
| to_encode = data.copy() |
| |
| if expires_delta: |
| expire = datetime.utcnow() + expires_delta |
| else: |
| expire = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes) |
| |
| to_encode.update({"exp": expire.timestamp()}) |
| |
| encoded_jwt = jwt.encode( |
| to_encode, |
| settings.jwt_secret_key.get_secret_value(), |
| algorithm=settings.jwt_algorithm |
| ) |
| |
| return encoded_jwt |
|
|
|
|
| def get_current_user(request: Request) -> dict: |
| """Get current authenticated user.""" |
| return { |
| "user_id": getattr(request.state, "user_id", None), |
| "email": getattr(request.state, "user_email", None), |
| "roles": getattr(request.state, "user_roles", []), |
| } |