|
|
""" |
|
|
Authentication routes for Cidadão.AI API |
|
|
""" |
|
|
|
|
|
from datetime import datetime |
|
|
from typing import Optional |
|
|
from fastapi import APIRouter, Depends, HTTPException, status |
|
|
from fastapi.security import HTTPAuthorizationCredentials |
|
|
from pydantic import BaseModel, EmailStr |
|
|
|
|
|
from ..auth import auth_manager, get_current_user, require_admin, security, User |
|
|
|
|
|
router = APIRouter(prefix="/auth", tags=["authentication"]) |
|
|
|
|
|
|
|
|
class LoginRequest(BaseModel): |
|
|
email: EmailStr |
|
|
password: str |
|
|
|
|
|
class LoginResponse(BaseModel): |
|
|
access_token: str |
|
|
refresh_token: str |
|
|
token_type: str = "bearer" |
|
|
expires_in: int |
|
|
user: dict |
|
|
|
|
|
class RefreshRequest(BaseModel): |
|
|
refresh_token: str |
|
|
|
|
|
class RefreshResponse(BaseModel): |
|
|
access_token: str |
|
|
token_type: str = "bearer" |
|
|
expires_in: int |
|
|
|
|
|
class RegisterRequest(BaseModel): |
|
|
email: EmailStr |
|
|
password: str |
|
|
name: str |
|
|
role: Optional[str] = "analyst" |
|
|
|
|
|
class ChangePasswordRequest(BaseModel): |
|
|
old_password: str |
|
|
new_password: str |
|
|
|
|
|
class UserResponse(BaseModel): |
|
|
id: str |
|
|
email: str |
|
|
name: str |
|
|
role: str |
|
|
is_active: bool |
|
|
created_at: datetime |
|
|
last_login: Optional[datetime] = None |
|
|
|
|
|
@router.post("/login", response_model=LoginResponse) |
|
|
async def login(request: LoginRequest): |
|
|
""" |
|
|
Authenticate user and return JWT tokens |
|
|
""" |
|
|
user = auth_manager.authenticate_user(request.email, request.password) |
|
|
|
|
|
if not user: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid credentials", |
|
|
headers={"WWW-Authenticate": "Bearer"} |
|
|
) |
|
|
|
|
|
access_token = auth_manager.create_access_token(user) |
|
|
refresh_token = auth_manager.create_refresh_token(user) |
|
|
|
|
|
return LoginResponse( |
|
|
access_token=access_token, |
|
|
refresh_token=refresh_token, |
|
|
expires_in=auth_manager.access_token_expire_minutes * 60, |
|
|
user={ |
|
|
"id": user.id, |
|
|
"email": user.email, |
|
|
"name": user.name, |
|
|
"role": user.role, |
|
|
"is_active": user.is_active |
|
|
} |
|
|
) |
|
|
|
|
|
@router.post("/refresh", response_model=RefreshResponse) |
|
|
async def refresh_token(request: RefreshRequest): |
|
|
""" |
|
|
Refresh access token using refresh token |
|
|
""" |
|
|
try: |
|
|
new_access_token = auth_manager.refresh_access_token(request.refresh_token) |
|
|
|
|
|
return RefreshResponse( |
|
|
access_token=new_access_token, |
|
|
expires_in=auth_manager.access_token_expire_minutes * 60 |
|
|
) |
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid refresh token" |
|
|
) |
|
|
|
|
|
@router.post("/register", response_model=UserResponse) |
|
|
async def register( |
|
|
request: RegisterRequest, |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
""" |
|
|
Register new user (admin only) |
|
|
""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
try: |
|
|
user = auth_manager.register_user( |
|
|
email=request.email, |
|
|
password=request.password, |
|
|
name=request.name, |
|
|
role=request.role |
|
|
) |
|
|
|
|
|
return UserResponse( |
|
|
id=user.id, |
|
|
email=user.email, |
|
|
name=user.name, |
|
|
role=user.role, |
|
|
is_active=user.is_active, |
|
|
created_at=user.created_at, |
|
|
last_login=user.last_login |
|
|
) |
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Failed to register user: {str(e)}" |
|
|
) |
|
|
|
|
|
@router.get("/me", response_model=UserResponse) |
|
|
async def get_current_user_info(current_user: User = Depends(get_current_user)): |
|
|
""" |
|
|
Get current user information |
|
|
""" |
|
|
return UserResponse( |
|
|
id=current_user.id, |
|
|
email=current_user.email, |
|
|
name=current_user.name, |
|
|
role=current_user.role, |
|
|
is_active=current_user.is_active, |
|
|
created_at=current_user.created_at, |
|
|
last_login=current_user.last_login |
|
|
) |
|
|
|
|
|
@router.post("/change-password") |
|
|
async def change_password( |
|
|
request: ChangePasswordRequest, |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
""" |
|
|
Change current user password |
|
|
""" |
|
|
try: |
|
|
success = auth_manager.change_password( |
|
|
user_id=current_user.id, |
|
|
old_password=request.old_password, |
|
|
new_password=request.new_password |
|
|
) |
|
|
|
|
|
if success: |
|
|
return {"message": "Password changed successfully"} |
|
|
else: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail="Failed to change password" |
|
|
) |
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Failed to change password: {str(e)}" |
|
|
) |
|
|
|
|
|
@router.post("/logout") |
|
|
async def logout(current_user: User = Depends(get_current_user)): |
|
|
""" |
|
|
Logout user (client should discard tokens) |
|
|
""" |
|
|
|
|
|
return {"message": "Logged out successfully"} |
|
|
|
|
|
@router.get("/users", response_model=list[UserResponse]) |
|
|
async def list_users(current_user: User = Depends(get_current_user)): |
|
|
""" |
|
|
List all users (admin only) |
|
|
""" |
|
|
require_admin(current_user) |
|
|
|
|
|
users = auth_manager.get_all_users() |
|
|
|
|
|
return [ |
|
|
UserResponse( |
|
|
id=user.id, |
|
|
email=user.email, |
|
|
name=user.name, |
|
|
role=user.role, |
|
|
is_active=user.is_active, |
|
|
created_at=user.created_at, |
|
|
last_login=user.last_login |
|
|
) for user in users |
|
|
] |
|
|
|
|
|
@router.post("/users/{user_id}/deactivate") |
|
|
async def deactivate_user( |
|
|
user_id: str, |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
""" |
|
|
Deactivate user account (admin only) |
|
|
""" |
|
|
require_admin(current_user) |
|
|
|
|
|
|
|
|
if user_id == current_user.id: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail="Cannot deactivate your own account" |
|
|
) |
|
|
|
|
|
try: |
|
|
success = auth_manager.deactivate_user(user_id) |
|
|
if success: |
|
|
return {"message": "User deactivated successfully"} |
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Failed to deactivate user: {str(e)}" |
|
|
) |
|
|
|
|
|
@router.post("/verify") |
|
|
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): |
|
|
""" |
|
|
Verify if token is valid |
|
|
""" |
|
|
try: |
|
|
user = auth_manager.get_current_user(credentials.credentials) |
|
|
return { |
|
|
"valid": True, |
|
|
"user": { |
|
|
"id": user.id, |
|
|
"email": user.email, |
|
|
"name": user.name, |
|
|
"role": user.role |
|
|
} |
|
|
} |
|
|
except HTTPException: |
|
|
return {"valid": False} |