| """ |
| HashiCorp Vault Client for Cidadão.AI |
| Production-grade secret management with fallback strategies |
| """ |
|
|
| import os |
| import time |
| import asyncio |
| import httpx |
| from typing import Dict, Any, Optional, List, Union |
| from datetime import datetime, timedelta |
| from dataclasses import dataclass, field |
| from enum import Enum |
| import structlog |
| from pathlib import Path |
| from src.core import json_utils |
|
|
| logger = structlog.get_logger(__name__) |
|
|
|
|
| class VaultStatus(Enum): |
| """Vault connection status""" |
| HEALTHY = "healthy" |
| DEGRADED = "degraded" |
| UNAVAILABLE = "unavailable" |
| NOT_CONFIGURED = "not_configured" |
|
|
|
|
| @dataclass |
| class VaultConfig: |
| """Vault client configuration""" |
| |
| url: str = field(default="http://localhost:8200") |
| token: Optional[str] = field(default=None) |
| namespace: Optional[str] = field(default=None) |
| timeout: int = field(default=10) |
| |
| |
| auth_method: str = field(default="token") |
| role_id: Optional[str] = field(default=None) |
| secret_id: Optional[str] = field(default=None) |
| |
| |
| secret_path: str = field(default="secret/cidadao-ai") |
| transit_path: str = field(default="transit") |
| |
| |
| cache_ttl: int = field(default=300) |
| max_cache_size: int = field(default=1000) |
| |
| |
| max_retries: int = field(default=3) |
| retry_delay: float = field(default=1.0) |
| circuit_breaker_threshold: int = field(default=5) |
| circuit_breaker_timeout: int = field(default=60) |
| |
| |
| fallback_to_env: bool = field(default=True) |
| require_vault: bool = field(default=False) |
|
|
|
|
| @dataclass |
| class SecretEntry: |
| """Cached secret entry""" |
| value: Any |
| created_at: datetime |
| ttl: int |
| last_accessed: datetime = field(default_factory=datetime.utcnow) |
| access_count: int = field(default=0) |
| |
| @property |
| def is_expired(self) -> bool: |
| """Check if secret is expired""" |
| return datetime.utcnow() > self.created_at + timedelta(seconds=self.ttl) |
| |
| def touch(self): |
| """Update access statistics""" |
| self.last_accessed = datetime.utcnow() |
| self.access_count += 1 |
|
|
|
|
| class VaultClientError(Exception): |
| """Base Vault client error""" |
| pass |
|
|
|
|
| class VaultUnavailableError(VaultClientError): |
| """Vault service is unavailable""" |
| pass |
|
|
|
|
| class VaultAuthError(VaultClientError): |
| """Vault authentication failed""" |
| pass |
|
|
|
|
| class VaultCircuitBreakerError(VaultClientError): |
| """Circuit breaker is open""" |
| pass |
|
|
|
|
| class VaultClient: |
| """ |
| Production-grade HashiCorp Vault client with: |
| - Intelligent caching with TTL |
| - Circuit breaker pattern |
| - Graceful fallback to environment variables |
| - Comprehensive audit logging |
| - Health monitoring |
| """ |
| |
| def __init__(self, config: Optional[VaultConfig] = None): |
| self.config = config or self._load_config() |
| self._client: Optional[httpx.AsyncClient] = None |
| |
| |
| self._cache: Dict[str, SecretEntry] = {} |
| self._cache_stats = {"hits": 0, "misses": 0, "evictions": 0} |
| |
| |
| self._circuit_breaker_failures = 0 |
| self._circuit_breaker_last_failure: Optional[datetime] = None |
| self._circuit_breaker_open = False |
| |
| |
| self._status = VaultStatus.NOT_CONFIGURED |
| self._last_health_check: Optional[datetime] = None |
| self._health_check_interval = 30 |
| |
| |
| self._auth_token: Optional[str] = None |
| self._auth_expires: Optional[datetime] = None |
| |
| logger.info( |
| "vault_client_initialized", |
| vault_url=self.config.url, |
| auth_method=self.config.auth_method, |
| fallback_enabled=self.config.fallback_to_env, |
| cache_ttl=self.config.cache_ttl |
| ) |
| |
| @classmethod |
| def _load_config(cls) -> VaultConfig: |
| """Load configuration from environment""" |
| return VaultConfig( |
| url=os.getenv("VAULT_URL", "http://localhost:8200"), |
| token=os.getenv("VAULT_TOKEN"), |
| namespace=os.getenv("VAULT_NAMESPACE"), |
| timeout=int(os.getenv("VAULT_TIMEOUT", "10")), |
| |
| auth_method=os.getenv("VAULT_AUTH_METHOD", "token"), |
| role_id=os.getenv("VAULT_ROLE_ID"), |
| secret_id=os.getenv("VAULT_SECRET_ID"), |
| |
| secret_path=os.getenv("VAULT_SECRET_PATH", "secret/cidadao-ai"), |
| cache_ttl=int(os.getenv("VAULT_CACHE_TTL", "300")), |
| |
| fallback_to_env=os.getenv("VAULT_FALLBACK_TO_ENV", "true").lower() == "true", |
| require_vault=os.getenv("VAULT_REQUIRE", "false").lower() == "true" |
| ) |
| |
| async def __aenter__(self): |
| await self.initialize() |
| return self |
| |
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| await self.close() |
| |
| async def initialize(self): |
| """Initialize Vault client and authenticate""" |
| try: |
| self._client = httpx.AsyncClient( |
| timeout=self.config.timeout, |
| headers={"X-Vault-Namespace": self.config.namespace} if self.config.namespace else {} |
| ) |
| |
| |
| await self._authenticate() |
| await self._health_check() |
| |
| self._status = VaultStatus.HEALTHY |
| |
| logger.info( |
| "vault_client_connected", |
| vault_url=self.config.url, |
| status=self._status.value |
| ) |
| |
| except Exception as e: |
| logger.error( |
| "vault_client_initialization_failed", |
| error=str(e), |
| vault_url=self.config.url |
| ) |
| |
| if self.config.require_vault: |
| raise VaultUnavailableError(f"Vault initialization failed: {e}") |
| |
| self._status = VaultStatus.UNAVAILABLE |
| logger.warning( |
| "vault_fallback_mode_enabled", |
| reason="initialization_failed" |
| ) |
| |
| async def close(self): |
| """Close client connections""" |
| if self._client: |
| await self._client.aclose() |
| self._client = None |
| |
| logger.info("vault_client_closed") |
| |
| async def _authenticate(self): |
| """Authenticate with Vault""" |
| if not self._client: |
| raise VaultClientError("Client not initialized") |
| |
| if self.config.auth_method == "token": |
| if not self.config.token: |
| raise VaultAuthError("Vault token not provided") |
| |
| self._auth_token = self.config.token |
| |
| |
| response = await self._client.get( |
| f"{self.config.url}/v1/auth/token/lookup-self", |
| headers={"X-Vault-Token": self._auth_token} |
| ) |
| |
| if response.status_code != 200: |
| raise VaultAuthError(f"Token validation failed: {response.status_code}") |
| |
| token_info = response.json() |
| if token_info.get("data", {}).get("expire_time"): |
| |
| pass |
| |
| logger.info("vault_token_authenticated") |
| |
| elif self.config.auth_method == "approle": |
| if not self.config.role_id or not self.config.secret_id: |
| raise VaultAuthError("AppRole credentials not provided") |
| |
| |
| login_data = { |
| "role_id": self.config.role_id, |
| "secret_id": self.config.secret_id |
| } |
| |
| response = await self._client.post( |
| f"{self.config.url}/v1/auth/approle/login", |
| json=login_data |
| ) |
| |
| if response.status_code != 200: |
| raise VaultAuthError(f"AppRole login failed: {response.status_code}") |
| |
| auth_data = response.json()["auth"] |
| self._auth_token = auth_data["client_token"] |
| |
| |
| if auth_data.get("lease_duration"): |
| self._auth_expires = datetime.utcnow() + timedelta(seconds=auth_data["lease_duration"]) |
| |
| logger.info( |
| "vault_approle_authenticated", |
| lease_duration=auth_data.get("lease_duration", 0) |
| ) |
| |
| else: |
| raise VaultAuthError(f"Unsupported auth method: {self.config.auth_method}") |
| |
| async def _health_check(self) -> bool: |
| """Perform Vault health check""" |
| if not self._client: |
| return False |
| |
| try: |
| response = await self._client.get(f"{self.config.url}/v1/sys/health") |
| |
| if response.status_code == 200: |
| health_data = response.json() |
| is_healthy = not health_data.get("sealed", True) |
| |
| if is_healthy: |
| self._status = VaultStatus.HEALTHY |
| self._circuit_breaker_failures = 0 |
| self._circuit_breaker_open = False |
| else: |
| self._status = VaultStatus.DEGRADED |
| |
| self._last_health_check = datetime.utcnow() |
| return is_healthy |
| |
| except Exception as e: |
| logger.warning("vault_health_check_failed", error=str(e)) |
| |
| self._status = VaultStatus.UNAVAILABLE |
| return False |
| |
| def _is_circuit_breaker_open(self) -> bool: |
| """Check if circuit breaker is open""" |
| if not self._circuit_breaker_open: |
| return False |
| |
| |
| if (self._circuit_breaker_last_failure and |
| datetime.utcnow() > self._circuit_breaker_last_failure + |
| timedelta(seconds=self.config.circuit_breaker_timeout)): |
| |
| self._circuit_breaker_open = False |
| logger.info("vault_circuit_breaker_closed") |
| return False |
| |
| return True |
| |
| def _record_failure(self): |
| """Record a failure for circuit breaker""" |
| self._circuit_breaker_failures += 1 |
| self._circuit_breaker_last_failure = datetime.utcnow() |
| |
| if self._circuit_breaker_failures >= self.config.circuit_breaker_threshold: |
| self._circuit_breaker_open = True |
| logger.warning( |
| "vault_circuit_breaker_opened", |
| failure_count=self._circuit_breaker_failures |
| ) |
| |
| async def get_secret(self, key: str, version: Optional[int] = None) -> Optional[str]: |
| """ |
| Get secret value with intelligent caching and fallback |
| |
| Args: |
| key: Secret key name |
| version: KV version (for versioned secrets) |
| |
| Returns: |
| Secret value or None if not found |
| """ |
| cache_key = f"{key}:{version}" if version else key |
| |
| |
| if cache_key in self._cache: |
| entry = self._cache[cache_key] |
| if not entry.is_expired: |
| entry.touch() |
| self._cache_stats["hits"] += 1 |
| |
| logger.debug( |
| "vault_secret_cache_hit", |
| key=key, |
| version=version, |
| access_count=entry.access_count |
| ) |
| |
| return entry.value |
| else: |
| |
| del self._cache[cache_key] |
| |
| self._cache_stats["misses"] += 1 |
| |
| |
| if self._status in [VaultStatus.HEALTHY, VaultStatus.DEGRADED]: |
| try: |
| value = await self._fetch_from_vault(key, version) |
| if value is not None: |
| |
| self._cache[cache_key] = SecretEntry( |
| value=value, |
| created_at=datetime.utcnow(), |
| ttl=self.config.cache_ttl |
| ) |
| |
| |
| await self._cleanup_cache() |
| |
| logger.info( |
| "vault_secret_retrieved", |
| key=key, |
| version=version, |
| source="vault" |
| ) |
| |
| return value |
| |
| except Exception as e: |
| logger.error( |
| "vault_secret_fetch_failed", |
| key=key, |
| error=str(e) |
| ) |
| self._record_failure() |
| |
| |
| if self.config.fallback_to_env: |
| env_value = os.getenv(key.upper().replace("-", "_").replace("/", "_")) |
| if env_value: |
| logger.info( |
| "vault_secret_retrieved", |
| key=key, |
| source="environment" |
| ) |
| return env_value |
| |
| logger.warning( |
| "vault_secret_not_found", |
| key=key, |
| version=version, |
| vault_status=self._status.value |
| ) |
| |
| return None |
| |
| async def _fetch_from_vault(self, key: str, version: Optional[int] = None) -> Optional[str]: |
| """Fetch secret directly from Vault""" |
| if self._is_circuit_breaker_open(): |
| raise VaultCircuitBreakerError("Circuit breaker is open") |
| |
| if not self._client or not self._auth_token: |
| raise VaultClientError("Client not authenticated") |
| |
| |
| if (self._auth_expires and datetime.utcnow() > self._auth_expires): |
| await self._authenticate() |
| |
| |
| if version: |
| url = f"{self.config.url}/v1/{self.config.secret_path}/data/{key}" |
| params = {"version": str(version)} |
| else: |
| url = f"{self.config.url}/v1/{self.config.secret_path}/data/{key}" |
| params = {} |
| |
| headers = {"X-Vault-Token": self._auth_token} |
| |
| for attempt in range(self.config.max_retries): |
| try: |
| response = await self._client.get(url, headers=headers, params=params) |
| |
| if response.status_code == 200: |
| data = response.json() |
| |
| |
| if "data" in data and "data" in data["data"]: |
| secret_data = data["data"]["data"] |
| else: |
| secret_data = data.get("data", {}) |
| |
| |
| if isinstance(secret_data, dict): |
| return secret_data.get("value") or json_utils.dumps(secret_data) |
| else: |
| return str(secret_data) |
| |
| elif response.status_code == 404: |
| return None |
| |
| elif response.status_code == 403: |
| raise VaultAuthError("Access denied to secret") |
| |
| else: |
| raise VaultClientError(f"Vault API error: {response.status_code}") |
| |
| except httpx.RequestError as e: |
| if attempt == self.config.max_retries - 1: |
| raise VaultClientError(f"Network error: {e}") |
| |
| await asyncio.sleep(self.config.retry_delay * (2 ** attempt)) |
| |
| raise VaultClientError("Max retries exceeded") |
| |
| async def _cleanup_cache(self): |
| """Cleanup expired entries and enforce size limits""" |
| now = datetime.utcnow() |
| |
| |
| expired_keys = [ |
| key for key, entry in self._cache.items() |
| if entry.is_expired |
| ] |
| |
| for key in expired_keys: |
| del self._cache[key] |
| |
| self._cache_stats["evictions"] += len(expired_keys) |
| |
| |
| if len(self._cache) > self.config.max_cache_size: |
| |
| sorted_items = sorted( |
| self._cache.items(), |
| key=lambda x: x[1].last_accessed |
| ) |
| |
| to_remove = len(self._cache) - self.config.max_cache_size |
| for key, _ in sorted_items[:to_remove]: |
| del self._cache[key] |
| self._cache_stats["evictions"] += 1 |
| |
| async def set_secret(self, key: str, value: str, metadata: Optional[Dict] = None) -> bool: |
| """Set a secret value in Vault""" |
| if self._is_circuit_breaker_open(): |
| raise VaultCircuitBreakerError("Circuit breaker is open") |
| |
| if not self._client or not self._auth_token: |
| raise VaultClientError("Client not authenticated") |
| |
| url = f"{self.config.url}/v1/{self.config.secret_path}/data/{key}" |
| headers = {"X-Vault-Token": self._auth_token} |
| |
| payload = { |
| "data": { |
| "value": value, |
| **(metadata or {}) |
| } |
| } |
| |
| try: |
| response = await self._client.post(url, headers=headers, json=payload) |
| |
| if response.status_code in [200, 204]: |
| |
| cache_keys_to_remove = [k for k in self._cache.keys() if k.startswith(key)] |
| for cache_key in cache_keys_to_remove: |
| del self._cache[cache_key] |
| |
| logger.info("vault_secret_stored", key=key) |
| return True |
| |
| else: |
| logger.error( |
| "vault_secret_store_failed", |
| key=key, |
| status_code=response.status_code |
| ) |
| return False |
| |
| except Exception as e: |
| logger.error("vault_secret_store_error", key=key, error=str(e)) |
| self._record_failure() |
| return False |
| |
| def get_stats(self) -> Dict[str, Any]: |
| """Get client statistics""" |
| return { |
| "status": self._status.value, |
| "cache_stats": self._cache_stats, |
| "cache_size": len(self._cache), |
| "circuit_breaker": { |
| "open": self._circuit_breaker_open, |
| "failures": self._circuit_breaker_failures, |
| "last_failure": self._circuit_breaker_last_failure.isoformat() if self._circuit_breaker_last_failure else None |
| }, |
| "last_health_check": self._last_health_check.isoformat() if self._last_health_check else None, |
| "config": { |
| "url": self.config.url, |
| "auth_method": self.config.auth_method, |
| "cache_ttl": self.config.cache_ttl, |
| "fallback_enabled": self.config.fallback_to_env |
| } |
| } |
|
|
|
|
| |
| _vault_client: Optional[VaultClient] = None |
|
|
|
|
| async def get_vault_client(config: Optional[VaultConfig] = None) -> VaultClient: |
| """Get or create global Vault client instance""" |
| global _vault_client |
| |
| if _vault_client is None: |
| _vault_client = VaultClient(config) |
| await _vault_client.initialize() |
| |
| return _vault_client |
|
|
|
|
| async def close_vault_client(): |
| """Close global Vault client""" |
| global _vault_client |
| |
| if _vault_client: |
| await _vault_client.close() |
| _vault_client = None |