MogensR's picture
Create config/settings.py
2a02320
"""
Main settings module for BackgroundFX Pro.
Automatically loads the appropriate configuration based on environment.
"""
import os
from pathlib import Path
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
import logging
from .base import BaseConfig, config_manager, ConfigurationError
from .validators import validate_production_config
logger = logging.getLogger(__name__)
@dataclass
class Settings(BaseConfig):
"""
Main application settings with all configuration options.
"""
# API Configuration
api_version: str = "v1"
api_prefix: str = "/api"
api_title: str = "BackgroundFX Pro API"
api_description: str = "AI-powered background removal and replacement"
api_docs_enabled: bool = True
# Database Configuration
database_url: str = field(
default_factory=lambda: os.getenv(
"DATABASE_URL",
"postgresql://user:password@localhost/backgroundfx"
)
)
database_pool_size: int = 20
database_max_overflow: int = 40
database_echo: bool = False
# MongoDB Configuration
mongodb_url: str = field(
default_factory=lambda: os.getenv(
"MONGODB_URL",
"mongodb://localhost:27017/backgroundfx"
)
)
mongodb_database: str = "backgroundfx"
# Redis Configuration
redis_url: str = field(
default_factory=lambda: os.getenv(
"REDIS_URL",
"redis://localhost:6379/0"
)
)
redis_max_connections: int = 50
redis_decode_responses: bool = True
# Storage Configuration
storage_backend: str = "s3" # Options: s3, local, gcs, azure
storage_bucket: str = "backgroundfx-uploads"
storage_region: str = "us-east-1"
storage_access_key: str = field(default_factory=lambda: os.getenv("AWS_ACCESS_KEY_ID", ""))
storage_secret_key: str = field(default_factory=lambda: os.getenv("AWS_SECRET_ACCESS_KEY", ""))
storage_endpoint_url: Optional[str] = None # For S3-compatible services
storage_use_ssl: bool = True
local_storage_path: Path = field(default_factory=lambda: Path("storage"))
# Processing Configuration
max_image_size: int = 50 * 1024 * 1024 # 50MB
max_video_size: int = 500 * 1024 * 1024 # 500MB
max_batch_size: int = 100
processing_timeout: int = 300 # seconds
enable_gpu: bool = True
gpu_memory_fraction: float = 0.8
# Model Configuration
models_dir: Path = field(default_factory=lambda: Path("models"))
model_cache_dir: Path = field(default_factory=lambda: Path("/tmp/model_cache"))
default_model: str = "rembg"
model_download_timeout: int = 600 # seconds
model_configs: Dict[str, Dict[str, Any]] = field(default_factory=lambda: {
"rembg": {
"path": "models/rembg",
"version": "1.0.0",
"gpu_enabled": True,
"batch_size": 4
},
"u2net": {
"path": "models/u2net",
"version": "1.0.0",
"gpu_enabled": True,
"batch_size": 2
},
"sam2": {
"path": "models/sam2",
"version": "2.0.0",
"gpu_enabled": True,
"batch_size": 1
}
})
# Queue Configuration
celery_broker_url: str = field(
default_factory=lambda: os.getenv(
"CELERY_BROKER_URL",
"redis://localhost:6379/1"
)
)
celery_result_backend: str = field(
default_factory=lambda: os.getenv(
"CELERY_RESULT_BACKEND",
"redis://localhost:6379/2"
)
)
celery_task_time_limit: int = 600 # seconds
celery_task_soft_time_limit: int = 540 # seconds
celery_worker_concurrency: int = 4
celery_worker_prefetch_multiplier: int = 1
# Authentication Configuration
jwt_secret_key: str = field(
default_factory=lambda: os.getenv(
"JWT_SECRET_KEY",
"change-me-in-production"
)
)
jwt_algorithm: str = "HS256"
jwt_expiration_delta: int = 3600 # seconds
jwt_refresh_expiration_delta: int = 86400 * 7 # 7 days
# OAuth Configuration
oauth_google_client_id: str = field(default_factory=lambda: os.getenv("GOOGLE_CLIENT_ID", ""))
oauth_google_client_secret: str = field(default_factory=lambda: os.getenv("GOOGLE_CLIENT_SECRET", ""))
oauth_github_client_id: str = field(default_factory=lambda: os.getenv("GITHUB_CLIENT_ID", ""))
oauth_github_client_secret: str = field(default_factory=lambda: os.getenv("GITHUB_CLIENT_SECRET", ""))
# Email Configuration
smtp_host: str = field(default_factory=lambda: os.getenv("SMTP_HOST", "smtp.gmail.com"))
smtp_port: int = 587
smtp_user: str = field(default_factory=lambda: os.getenv("SMTP_USER", ""))
smtp_password: str = field(default_factory=lambda: os.getenv("SMTP_PASSWORD", ""))
smtp_use_tls: bool = True
email_from: str = "[email protected]"
email_from_name: str = "BackgroundFX Pro"
# Monitoring Configuration
sentry_dsn: str = field(default_factory=lambda: os.getenv("SENTRY_DSN", ""))
sentry_environment: str = field(default_factory=lambda: os.getenv("ENVIRONMENT", "development"))
sentry_traces_sample_rate: float = 0.1
prometheus_enabled: bool = True
prometheus_port: int = 9090
# Webhook Configuration
webhook_timeout: int = 30 # seconds
webhook_max_retries: int = 3
webhook_retry_delay: int = 5 # seconds
# Cache Configuration
cache_ttl: int = 3600 # seconds
cache_max_entries: int = 10000
cache_eviction_policy: str = "lru"
# CDN Configuration
cdn_enabled: bool = True
cdn_base_url: str = "https://cdn.backgroundfx.pro"
cdn_cache_control: str = "public, max-age=31536000"
# Payment Configuration
stripe_secret_key: str = field(default_factory=lambda: os.getenv("STRIPE_SECRET_KEY", ""))
stripe_publishable_key: str = field(default_factory=lambda: os.getenv("STRIPE_PUBLISHABLE_KEY", ""))
stripe_webhook_secret: str = field(default_factory=lambda: os.getenv("STRIPE_WEBHOOK_SECRET", ""))
# Plan Limits
plan_limits: Dict[str, Dict[str, Any]] = field(default_factory=lambda: {
"free": {
"images_per_month": 100,
"videos_per_month": 5,
"max_image_size": 10 * 1024 * 1024, # 10MB
"max_video_size": 50 * 1024 * 1024, # 50MB
"api_calls_per_hour": 100,
"storage_gb": 1,
"quality": "medium"
},
"pro": {
"images_per_month": 5000,
"videos_per_month": 100,
"max_image_size": 50 * 1024 * 1024, # 50MB
"max_video_size": 200 * 1024 * 1024, # 200MB
"api_calls_per_hour": 1000,
"storage_gb": 50,
"quality": "high"
},
"enterprise": {
"images_per_month": -1, # Unlimited
"videos_per_month": -1, # Unlimited
"max_image_size": 100 * 1024 * 1024, # 100MB
"max_video_size": 500 * 1024 * 1024, # 500MB
"api_calls_per_hour": -1, # Unlimited
"storage_gb": 500,
"quality": "ultra"
}
})
def __post_init__(self):
"""Initialize and validate settings."""
super().__post_init__()
# Add production validator for production environment
if self.environment == "production":
self.add_validator(validate_production_config)
# Setup model directories
self.models_dir.mkdir(parents=True, exist_ok=True)
self.model_cache_dir.mkdir(parents=True, exist_ok=True)
# Setup local storage if using local backend
if self.storage_backend == "local":
self.local_storage_path.mkdir(parents=True, exist_ok=True)
def get_database_config(self) -> Dict[str, Any]:
"""Get database configuration as dictionary."""
return {
"url": self.database_url,
"pool_size": self.database_pool_size,
"max_overflow": self.database_max_overflow,
"echo": self.database_echo
}
def get_redis_config(self) -> Dict[str, Any]:
"""Get Redis configuration as dictionary."""
return {
"url": self.redis_url,
"max_connections": self.redis_max_connections,
"decode_responses": self.redis_decode_responses
}
def get_storage_config(self) -> Dict[str, Any]:
"""Get storage configuration as dictionary."""
config = {
"backend": self.storage_backend,
"bucket": self.storage_bucket,
"region": self.storage_region,
"use_ssl": self.storage_use_ssl
}
if self.storage_backend == "s3":
config.update({
"access_key": self.storage_access_key,
"secret_key": self.storage_secret_key,
"endpoint_url": self.storage_endpoint_url
})
elif self.storage_backend == "local":
config["path"] = str(self.local_storage_path)
return config
def get_model_config(self, model_name: str) -> Dict[str, Any]:
"""Get configuration for specific model."""
if model_name not in self.model_configs:
raise ConfigurationError(f"Model configuration not found: {model_name}")
return self.model_configs[model_name]
def get_plan_limits(self, plan: str) -> Dict[str, Any]:
"""Get limits for specific plan."""
if plan not in self.plan_limits:
raise ConfigurationError(f"Plan not found: {plan}")
return self.plan_limits[plan]
def is_feature_enabled(self, feature: str) -> bool:
"""Check if a feature is enabled."""
feature_map = {
"video": self.enable_video_processing,
"batch": self.enable_batch_processing,
"ai_backgrounds": self.enable_ai_backgrounds,
"webhooks": self.enable_webhooks,
"gpu": self.enable_gpu
}
return feature_map.get(feature, False)
def load_settings(environment: Optional[str] = None) -> Settings:
"""
Load settings for specified environment.
Args:
environment: Environment name (development, staging, production, testing)
If not specified, uses ENVIRONMENT env var or defaults to development
Returns:
Settings instance configured for the environment
"""
environment = environment or os.getenv("ENVIRONMENT", "development")
# Import environment-specific configuration
try:
if environment == "development":
from .environments.development import DevelopmentSettings
settings_class = DevelopmentSettings
elif environment == "staging":
from .environments.staging import StagingSettings
settings_class = StagingSettings
elif environment == "production":
from .environments.production import ProductionSettings
settings_class = ProductionSettings
elif environment == "testing":
from .environments.testing import TestingSettings
settings_class = TestingSettings
else:
logger.warning(f"Unknown environment: {environment}, using base settings")
settings_class = Settings
except ImportError:
logger.warning(f"Environment module not found: {environment}, using base settings")
settings_class = Settings
# Create settings instance
settings = settings_class()
# Load from environment variables
from .base import EnvironmentSource
settings.add_source(EnvironmentSource())
# Load from .env file if exists
env_file = Path(f".env.{environment}")
if env_file.exists():
from .base import EnvFileSource
settings.add_source(EnvFileSource(str(env_file)))
elif Path(".env").exists():
from .base import EnvFileSource
settings.add_source(EnvFileSource(".env"))
# Load from config file if exists
config_file = Path(f"config/{environment}.yaml")
if config_file.exists():
from .base import YAMLFileSource
settings.add_source(YAMLFileSource(str(config_file)))
# Load all sources
settings.load_from_sources()
# Validate
settings.validate()
# Register with manager
config_manager.register(environment, settings)
config_manager.set_active(environment)
logger.info(f"Loaded settings for environment: {environment}")
return settings
# Auto-load settings on import
settings = load_settings()
# Export commonly used settings
DEBUG = settings.debug
SECRET_KEY = settings.secret_key
DATABASE_URL = settings.database_url
REDIS_URL = settings.redis_url