|
""" |
|
Main settings module for BackgroundFX Pro. |
|
|
|
Automatically loads the appropriate configuration based on environment. |
|
""" |
|
|
|
import os |
|
from pathlib import Path |
|
from typing import Optional, Dict, Any |
|
from dataclasses import dataclass, field |
|
import logging |
|
|
|
from .base import BaseConfig, config_manager, ConfigurationError |
|
from .validators import validate_production_config |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
class Settings(BaseConfig): |
|
""" |
|
Main application settings with all configuration options. |
|
""" |
|
|
|
|
|
api_version: str = "v1" |
|
api_prefix: str = "/api" |
|
api_title: str = "BackgroundFX Pro API" |
|
api_description: str = "AI-powered background removal and replacement" |
|
api_docs_enabled: bool = True |
|
|
|
|
|
database_url: str = field( |
|
default_factory=lambda: os.getenv( |
|
"DATABASE_URL", |
|
"postgresql://user:password@localhost/backgroundfx" |
|
) |
|
) |
|
database_pool_size: int = 20 |
|
database_max_overflow: int = 40 |
|
database_echo: bool = False |
|
|
|
|
|
mongodb_url: str = field( |
|
default_factory=lambda: os.getenv( |
|
"MONGODB_URL", |
|
"mongodb://localhost:27017/backgroundfx" |
|
) |
|
) |
|
mongodb_database: str = "backgroundfx" |
|
|
|
|
|
redis_url: str = field( |
|
default_factory=lambda: os.getenv( |
|
"REDIS_URL", |
|
"redis://localhost:6379/0" |
|
) |
|
) |
|
redis_max_connections: int = 50 |
|
redis_decode_responses: bool = True |
|
|
|
|
|
storage_backend: str = "s3" |
|
storage_bucket: str = "backgroundfx-uploads" |
|
storage_region: str = "us-east-1" |
|
storage_access_key: str = field(default_factory=lambda: os.getenv("AWS_ACCESS_KEY_ID", "")) |
|
storage_secret_key: str = field(default_factory=lambda: os.getenv("AWS_SECRET_ACCESS_KEY", "")) |
|
storage_endpoint_url: Optional[str] = None |
|
storage_use_ssl: bool = True |
|
local_storage_path: Path = field(default_factory=lambda: Path("storage")) |
|
|
|
|
|
max_image_size: int = 50 * 1024 * 1024 |
|
max_video_size: int = 500 * 1024 * 1024 |
|
max_batch_size: int = 100 |
|
processing_timeout: int = 300 |
|
enable_gpu: bool = True |
|
gpu_memory_fraction: float = 0.8 |
|
|
|
|
|
models_dir: Path = field(default_factory=lambda: Path("models")) |
|
model_cache_dir: Path = field(default_factory=lambda: Path("/tmp/model_cache")) |
|
default_model: str = "rembg" |
|
model_download_timeout: int = 600 |
|
model_configs: Dict[str, Dict[str, Any]] = field(default_factory=lambda: { |
|
"rembg": { |
|
"path": "models/rembg", |
|
"version": "1.0.0", |
|
"gpu_enabled": True, |
|
"batch_size": 4 |
|
}, |
|
"u2net": { |
|
"path": "models/u2net", |
|
"version": "1.0.0", |
|
"gpu_enabled": True, |
|
"batch_size": 2 |
|
}, |
|
"sam2": { |
|
"path": "models/sam2", |
|
"version": "2.0.0", |
|
"gpu_enabled": True, |
|
"batch_size": 1 |
|
} |
|
}) |
|
|
|
|
|
celery_broker_url: str = field( |
|
default_factory=lambda: os.getenv( |
|
"CELERY_BROKER_URL", |
|
"redis://localhost:6379/1" |
|
) |
|
) |
|
celery_result_backend: str = field( |
|
default_factory=lambda: os.getenv( |
|
"CELERY_RESULT_BACKEND", |
|
"redis://localhost:6379/2" |
|
) |
|
) |
|
celery_task_time_limit: int = 600 |
|
celery_task_soft_time_limit: int = 540 |
|
celery_worker_concurrency: int = 4 |
|
celery_worker_prefetch_multiplier: int = 1 |
|
|
|
|
|
jwt_secret_key: str = field( |
|
default_factory=lambda: os.getenv( |
|
"JWT_SECRET_KEY", |
|
"change-me-in-production" |
|
) |
|
) |
|
jwt_algorithm: str = "HS256" |
|
jwt_expiration_delta: int = 3600 |
|
jwt_refresh_expiration_delta: int = 86400 * 7 |
|
|
|
|
|
oauth_google_client_id: str = field(default_factory=lambda: os.getenv("GOOGLE_CLIENT_ID", "")) |
|
oauth_google_client_secret: str = field(default_factory=lambda: os.getenv("GOOGLE_CLIENT_SECRET", "")) |
|
oauth_github_client_id: str = field(default_factory=lambda: os.getenv("GITHUB_CLIENT_ID", "")) |
|
oauth_github_client_secret: str = field(default_factory=lambda: os.getenv("GITHUB_CLIENT_SECRET", "")) |
|
|
|
|
|
smtp_host: str = field(default_factory=lambda: os.getenv("SMTP_HOST", "smtp.gmail.com")) |
|
smtp_port: int = 587 |
|
smtp_user: str = field(default_factory=lambda: os.getenv("SMTP_USER", "")) |
|
smtp_password: str = field(default_factory=lambda: os.getenv("SMTP_PASSWORD", "")) |
|
smtp_use_tls: bool = True |
|
email_from: str = "[email protected]" |
|
email_from_name: str = "BackgroundFX Pro" |
|
|
|
|
|
sentry_dsn: str = field(default_factory=lambda: os.getenv("SENTRY_DSN", "")) |
|
sentry_environment: str = field(default_factory=lambda: os.getenv("ENVIRONMENT", "development")) |
|
sentry_traces_sample_rate: float = 0.1 |
|
|
|
prometheus_enabled: bool = True |
|
prometheus_port: int = 9090 |
|
|
|
|
|
webhook_timeout: int = 30 |
|
webhook_max_retries: int = 3 |
|
webhook_retry_delay: int = 5 |
|
|
|
|
|
cache_ttl: int = 3600 |
|
cache_max_entries: int = 10000 |
|
cache_eviction_policy: str = "lru" |
|
|
|
|
|
cdn_enabled: bool = True |
|
cdn_base_url: str = "https://cdn.backgroundfx.pro" |
|
cdn_cache_control: str = "public, max-age=31536000" |
|
|
|
|
|
stripe_secret_key: str = field(default_factory=lambda: os.getenv("STRIPE_SECRET_KEY", "")) |
|
stripe_publishable_key: str = field(default_factory=lambda: os.getenv("STRIPE_PUBLISHABLE_KEY", "")) |
|
stripe_webhook_secret: str = field(default_factory=lambda: os.getenv("STRIPE_WEBHOOK_SECRET", "")) |
|
|
|
|
|
plan_limits: Dict[str, Dict[str, Any]] = field(default_factory=lambda: { |
|
"free": { |
|
"images_per_month": 100, |
|
"videos_per_month": 5, |
|
"max_image_size": 10 * 1024 * 1024, |
|
"max_video_size": 50 * 1024 * 1024, |
|
"api_calls_per_hour": 100, |
|
"storage_gb": 1, |
|
"quality": "medium" |
|
}, |
|
"pro": { |
|
"images_per_month": 5000, |
|
"videos_per_month": 100, |
|
"max_image_size": 50 * 1024 * 1024, |
|
"max_video_size": 200 * 1024 * 1024, |
|
"api_calls_per_hour": 1000, |
|
"storage_gb": 50, |
|
"quality": "high" |
|
}, |
|
"enterprise": { |
|
"images_per_month": -1, |
|
"videos_per_month": -1, |
|
"max_image_size": 100 * 1024 * 1024, |
|
"max_video_size": 500 * 1024 * 1024, |
|
"api_calls_per_hour": -1, |
|
"storage_gb": 500, |
|
"quality": "ultra" |
|
} |
|
}) |
|
|
|
def __post_init__(self): |
|
"""Initialize and validate settings.""" |
|
super().__post_init__() |
|
|
|
|
|
if self.environment == "production": |
|
self.add_validator(validate_production_config) |
|
|
|
|
|
self.models_dir.mkdir(parents=True, exist_ok=True) |
|
self.model_cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if self.storage_backend == "local": |
|
self.local_storage_path.mkdir(parents=True, exist_ok=True) |
|
|
|
def get_database_config(self) -> Dict[str, Any]: |
|
"""Get database configuration as dictionary.""" |
|
return { |
|
"url": self.database_url, |
|
"pool_size": self.database_pool_size, |
|
"max_overflow": self.database_max_overflow, |
|
"echo": self.database_echo |
|
} |
|
|
|
def get_redis_config(self) -> Dict[str, Any]: |
|
"""Get Redis configuration as dictionary.""" |
|
return { |
|
"url": self.redis_url, |
|
"max_connections": self.redis_max_connections, |
|
"decode_responses": self.redis_decode_responses |
|
} |
|
|
|
def get_storage_config(self) -> Dict[str, Any]: |
|
"""Get storage configuration as dictionary.""" |
|
config = { |
|
"backend": self.storage_backend, |
|
"bucket": self.storage_bucket, |
|
"region": self.storage_region, |
|
"use_ssl": self.storage_use_ssl |
|
} |
|
|
|
if self.storage_backend == "s3": |
|
config.update({ |
|
"access_key": self.storage_access_key, |
|
"secret_key": self.storage_secret_key, |
|
"endpoint_url": self.storage_endpoint_url |
|
}) |
|
elif self.storage_backend == "local": |
|
config["path"] = str(self.local_storage_path) |
|
|
|
return config |
|
|
|
def get_model_config(self, model_name: str) -> Dict[str, Any]: |
|
"""Get configuration for specific model.""" |
|
if model_name not in self.model_configs: |
|
raise ConfigurationError(f"Model configuration not found: {model_name}") |
|
return self.model_configs[model_name] |
|
|
|
def get_plan_limits(self, plan: str) -> Dict[str, Any]: |
|
"""Get limits for specific plan.""" |
|
if plan not in self.plan_limits: |
|
raise ConfigurationError(f"Plan not found: {plan}") |
|
return self.plan_limits[plan] |
|
|
|
def is_feature_enabled(self, feature: str) -> bool: |
|
"""Check if a feature is enabled.""" |
|
feature_map = { |
|
"video": self.enable_video_processing, |
|
"batch": self.enable_batch_processing, |
|
"ai_backgrounds": self.enable_ai_backgrounds, |
|
"webhooks": self.enable_webhooks, |
|
"gpu": self.enable_gpu |
|
} |
|
return feature_map.get(feature, False) |
|
|
|
|
|
def load_settings(environment: Optional[str] = None) -> Settings: |
|
""" |
|
Load settings for specified environment. |
|
|
|
Args: |
|
environment: Environment name (development, staging, production, testing) |
|
If not specified, uses ENVIRONMENT env var or defaults to development |
|
|
|
Returns: |
|
Settings instance configured for the environment |
|
""" |
|
environment = environment or os.getenv("ENVIRONMENT", "development") |
|
|
|
|
|
try: |
|
if environment == "development": |
|
from .environments.development import DevelopmentSettings |
|
settings_class = DevelopmentSettings |
|
elif environment == "staging": |
|
from .environments.staging import StagingSettings |
|
settings_class = StagingSettings |
|
elif environment == "production": |
|
from .environments.production import ProductionSettings |
|
settings_class = ProductionSettings |
|
elif environment == "testing": |
|
from .environments.testing import TestingSettings |
|
settings_class = TestingSettings |
|
else: |
|
logger.warning(f"Unknown environment: {environment}, using base settings") |
|
settings_class = Settings |
|
except ImportError: |
|
logger.warning(f"Environment module not found: {environment}, using base settings") |
|
settings_class = Settings |
|
|
|
|
|
settings = settings_class() |
|
|
|
|
|
from .base import EnvironmentSource |
|
settings.add_source(EnvironmentSource()) |
|
|
|
|
|
env_file = Path(f".env.{environment}") |
|
if env_file.exists(): |
|
from .base import EnvFileSource |
|
settings.add_source(EnvFileSource(str(env_file))) |
|
elif Path(".env").exists(): |
|
from .base import EnvFileSource |
|
settings.add_source(EnvFileSource(".env")) |
|
|
|
|
|
config_file = Path(f"config/{environment}.yaml") |
|
if config_file.exists(): |
|
from .base import YAMLFileSource |
|
settings.add_source(YAMLFileSource(str(config_file))) |
|
|
|
|
|
settings.load_from_sources() |
|
|
|
|
|
settings.validate() |
|
|
|
|
|
config_manager.register(environment, settings) |
|
config_manager.set_active(environment) |
|
|
|
logger.info(f"Loaded settings for environment: {environment}") |
|
|
|
return settings |
|
|
|
|
|
|
|
settings = load_settings() |
|
|
|
|
|
DEBUG = settings.debug |
|
SECRET_KEY = settings.secret_key |
|
DATABASE_URL = settings.database_url |
|
REDIS_URL = settings.redis_url |