""" Main settings module for BackgroundFX Pro. Automatically loads the appropriate configuration based on environment. """ import os from pathlib import Path from typing import Optional, Dict, Any from dataclasses import dataclass, field import logging from .base import BaseConfig, config_manager, ConfigurationError from .validators import validate_production_config logger = logging.getLogger(__name__) @dataclass class Settings(BaseConfig): """ Main application settings with all configuration options. """ # API Configuration api_version: str = "v1" api_prefix: str = "/api" api_title: str = "BackgroundFX Pro API" api_description: str = "AI-powered background removal and replacement" api_docs_enabled: bool = True # Database Configuration database_url: str = field( default_factory=lambda: os.getenv( "DATABASE_URL", "postgresql://user:password@localhost/backgroundfx" ) ) database_pool_size: int = 20 database_max_overflow: int = 40 database_echo: bool = False # MongoDB Configuration mongodb_url: str = field( default_factory=lambda: os.getenv( "MONGODB_URL", "mongodb://localhost:27017/backgroundfx" ) ) mongodb_database: str = "backgroundfx" # Redis Configuration redis_url: str = field( default_factory=lambda: os.getenv( "REDIS_URL", "redis://localhost:6379/0" ) ) redis_max_connections: int = 50 redis_decode_responses: bool = True # Storage Configuration storage_backend: str = "s3" # Options: s3, local, gcs, azure storage_bucket: str = "backgroundfx-uploads" storage_region: str = "us-east-1" storage_access_key: str = field(default_factory=lambda: os.getenv("AWS_ACCESS_KEY_ID", "")) storage_secret_key: str = field(default_factory=lambda: os.getenv("AWS_SECRET_ACCESS_KEY", "")) storage_endpoint_url: Optional[str] = None # For S3-compatible services storage_use_ssl: bool = True local_storage_path: Path = field(default_factory=lambda: Path("storage")) # Processing Configuration max_image_size: int = 50 * 1024 * 1024 # 50MB max_video_size: int = 500 * 1024 * 1024 # 500MB max_batch_size: int = 100 processing_timeout: int = 300 # seconds enable_gpu: bool = True gpu_memory_fraction: float = 0.8 # Model Configuration models_dir: Path = field(default_factory=lambda: Path("models")) model_cache_dir: Path = field(default_factory=lambda: Path("/tmp/model_cache")) default_model: str = "rembg" model_download_timeout: int = 600 # seconds model_configs: Dict[str, Dict[str, Any]] = field(default_factory=lambda: { "rembg": { "path": "models/rembg", "version": "1.0.0", "gpu_enabled": True, "batch_size": 4 }, "u2net": { "path": "models/u2net", "version": "1.0.0", "gpu_enabled": True, "batch_size": 2 }, "sam2": { "path": "models/sam2", "version": "2.0.0", "gpu_enabled": True, "batch_size": 1 } }) # Queue Configuration celery_broker_url: str = field( default_factory=lambda: os.getenv( "CELERY_BROKER_URL", "redis://localhost:6379/1" ) ) celery_result_backend: str = field( default_factory=lambda: os.getenv( "CELERY_RESULT_BACKEND", "redis://localhost:6379/2" ) ) celery_task_time_limit: int = 600 # seconds celery_task_soft_time_limit: int = 540 # seconds celery_worker_concurrency: int = 4 celery_worker_prefetch_multiplier: int = 1 # Authentication Configuration jwt_secret_key: str = field( default_factory=lambda: os.getenv( "JWT_SECRET_KEY", "change-me-in-production" ) ) jwt_algorithm: str = "HS256" jwt_expiration_delta: int = 3600 # seconds jwt_refresh_expiration_delta: int = 86400 * 7 # 7 days # OAuth Configuration oauth_google_client_id: str = field(default_factory=lambda: os.getenv("GOOGLE_CLIENT_ID", "")) oauth_google_client_secret: str = field(default_factory=lambda: os.getenv("GOOGLE_CLIENT_SECRET", "")) oauth_github_client_id: str = field(default_factory=lambda: os.getenv("GITHUB_CLIENT_ID", "")) oauth_github_client_secret: str = field(default_factory=lambda: os.getenv("GITHUB_CLIENT_SECRET", "")) # Email Configuration smtp_host: str = field(default_factory=lambda: os.getenv("SMTP_HOST", "smtp.gmail.com")) smtp_port: int = 587 smtp_user: str = field(default_factory=lambda: os.getenv("SMTP_USER", "")) smtp_password: str = field(default_factory=lambda: os.getenv("SMTP_PASSWORD", "")) smtp_use_tls: bool = True email_from: str = "noreply@backgroundfx.pro" email_from_name: str = "BackgroundFX Pro" # Monitoring Configuration sentry_dsn: str = field(default_factory=lambda: os.getenv("SENTRY_DSN", "")) sentry_environment: str = field(default_factory=lambda: os.getenv("ENVIRONMENT", "development")) sentry_traces_sample_rate: float = 0.1 prometheus_enabled: bool = True prometheus_port: int = 9090 # Webhook Configuration webhook_timeout: int = 30 # seconds webhook_max_retries: int = 3 webhook_retry_delay: int = 5 # seconds # Cache Configuration cache_ttl: int = 3600 # seconds cache_max_entries: int = 10000 cache_eviction_policy: str = "lru" # CDN Configuration cdn_enabled: bool = True cdn_base_url: str = "https://cdn.backgroundfx.pro" cdn_cache_control: str = "public, max-age=31536000" # Payment Configuration stripe_secret_key: str = field(default_factory=lambda: os.getenv("STRIPE_SECRET_KEY", "")) stripe_publishable_key: str = field(default_factory=lambda: os.getenv("STRIPE_PUBLISHABLE_KEY", "")) stripe_webhook_secret: str = field(default_factory=lambda: os.getenv("STRIPE_WEBHOOK_SECRET", "")) # Plan Limits plan_limits: Dict[str, Dict[str, Any]] = field(default_factory=lambda: { "free": { "images_per_month": 100, "videos_per_month": 5, "max_image_size": 10 * 1024 * 1024, # 10MB "max_video_size": 50 * 1024 * 1024, # 50MB "api_calls_per_hour": 100, "storage_gb": 1, "quality": "medium" }, "pro": { "images_per_month": 5000, "videos_per_month": 100, "max_image_size": 50 * 1024 * 1024, # 50MB "max_video_size": 200 * 1024 * 1024, # 200MB "api_calls_per_hour": 1000, "storage_gb": 50, "quality": "high" }, "enterprise": { "images_per_month": -1, # Unlimited "videos_per_month": -1, # Unlimited "max_image_size": 100 * 1024 * 1024, # 100MB "max_video_size": 500 * 1024 * 1024, # 500MB "api_calls_per_hour": -1, # Unlimited "storage_gb": 500, "quality": "ultra" } }) def __post_init__(self): """Initialize and validate settings.""" super().__post_init__() # Add production validator for production environment if self.environment == "production": self.add_validator(validate_production_config) # Setup model directories self.models_dir.mkdir(parents=True, exist_ok=True) self.model_cache_dir.mkdir(parents=True, exist_ok=True) # Setup local storage if using local backend if self.storage_backend == "local": self.local_storage_path.mkdir(parents=True, exist_ok=True) def get_database_config(self) -> Dict[str, Any]: """Get database configuration as dictionary.""" return { "url": self.database_url, "pool_size": self.database_pool_size, "max_overflow": self.database_max_overflow, "echo": self.database_echo } def get_redis_config(self) -> Dict[str, Any]: """Get Redis configuration as dictionary.""" return { "url": self.redis_url, "max_connections": self.redis_max_connections, "decode_responses": self.redis_decode_responses } def get_storage_config(self) -> Dict[str, Any]: """Get storage configuration as dictionary.""" config = { "backend": self.storage_backend, "bucket": self.storage_bucket, "region": self.storage_region, "use_ssl": self.storage_use_ssl } if self.storage_backend == "s3": config.update({ "access_key": self.storage_access_key, "secret_key": self.storage_secret_key, "endpoint_url": self.storage_endpoint_url }) elif self.storage_backend == "local": config["path"] = str(self.local_storage_path) return config def get_model_config(self, model_name: str) -> Dict[str, Any]: """Get configuration for specific model.""" if model_name not in self.model_configs: raise ConfigurationError(f"Model configuration not found: {model_name}") return self.model_configs[model_name] def get_plan_limits(self, plan: str) -> Dict[str, Any]: """Get limits for specific plan.""" if plan not in self.plan_limits: raise ConfigurationError(f"Plan not found: {plan}") return self.plan_limits[plan] def is_feature_enabled(self, feature: str) -> bool: """Check if a feature is enabled.""" feature_map = { "video": self.enable_video_processing, "batch": self.enable_batch_processing, "ai_backgrounds": self.enable_ai_backgrounds, "webhooks": self.enable_webhooks, "gpu": self.enable_gpu } return feature_map.get(feature, False) def load_settings(environment: Optional[str] = None) -> Settings: """ Load settings for specified environment. Args: environment: Environment name (development, staging, production, testing) If not specified, uses ENVIRONMENT env var or defaults to development Returns: Settings instance configured for the environment """ environment = environment or os.getenv("ENVIRONMENT", "development") # Import environment-specific configuration try: if environment == "development": from .environments.development import DevelopmentSettings settings_class = DevelopmentSettings elif environment == "staging": from .environments.staging import StagingSettings settings_class = StagingSettings elif environment == "production": from .environments.production import ProductionSettings settings_class = ProductionSettings elif environment == "testing": from .environments.testing import TestingSettings settings_class = TestingSettings else: logger.warning(f"Unknown environment: {environment}, using base settings") settings_class = Settings except ImportError: logger.warning(f"Environment module not found: {environment}, using base settings") settings_class = Settings # Create settings instance settings = settings_class() # Load from environment variables from .base import EnvironmentSource settings.add_source(EnvironmentSource()) # Load from .env file if exists env_file = Path(f".env.{environment}") if env_file.exists(): from .base import EnvFileSource settings.add_source(EnvFileSource(str(env_file))) elif Path(".env").exists(): from .base import EnvFileSource settings.add_source(EnvFileSource(".env")) # Load from config file if exists config_file = Path(f"config/{environment}.yaml") if config_file.exists(): from .base import YAMLFileSource settings.add_source(YAMLFileSource(str(config_file))) # Load all sources settings.load_from_sources() # Validate settings.validate() # Register with manager config_manager.register(environment, settings) config_manager.set_active(environment) logger.info(f"Loaded settings for environment: {environment}") return settings # Auto-load settings on import settings = load_settings() # Export commonly used settings DEBUG = settings.debug SECRET_KEY = settings.secret_key DATABASE_URL = settings.database_url REDIS_URL = settings.redis_url