MogensR's picture
Create config/services/database.py
a274daf
"""
Database configuration for BackgroundFX Pro.
Handles PostgreSQL, MongoDB, and other database connections.
"""
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from sqlalchemy import create_engine, Engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool, NullPool
import pymongo
from pymongo import MongoClient
import logging
logger = logging.getLogger(__name__)
@dataclass
class DatabaseConfig:
"""Database configuration settings."""
# PostgreSQL settings
postgres_url: str = "postgresql://user:password@localhost/backgroundfx"
postgres_pool_size: int = 20
postgres_max_overflow: int = 40
postgres_pool_timeout: int = 30
postgres_pool_recycle: int = 3600
postgres_echo: bool = False
postgres_echo_pool: bool = False
# MongoDB settings
mongodb_url: str = "mongodb://localhost:27017/backgroundfx"
mongodb_database: str = "backgroundfx"
mongodb_max_pool_size: int = 50
mongodb_min_pool_size: int = 10
mongodb_max_idle_time: int = 60000 # milliseconds
mongodb_server_selection_timeout: int = 30000 # milliseconds
# Connection retry settings
max_retries: int = 3
retry_delay: int = 1 # seconds
# Query settings
query_timeout: int = 30 # seconds
slow_query_threshold: float = 1.0 # seconds
log_slow_queries: bool = True
# Migration settings
auto_migrate: bool = False
migration_directory: str = "migrations"
class PostgreSQLManager:
"""Manages PostgreSQL database connections."""
def __init__(self, config: DatabaseConfig):
self.config = config
self._engine: Optional[Engine] = None
self._session_factory: Optional[sessionmaker] = None
@property
def engine(self) -> Engine:
"""Get or create database engine."""
if self._engine is None:
self._engine = self._create_engine()
return self._engine
@property
def session_factory(self) -> sessionmaker:
"""Get or create session factory."""
if self._session_factory is None:
self._session_factory = sessionmaker(
bind=self.engine,
expire_on_commit=False
)
return self._session_factory
def _create_engine(self) -> Engine:
"""Create SQLAlchemy engine with connection pooling."""
# Determine pool class based on environment
if self.config.postgres_pool_size == 0:
poolclass = NullPool
pool_kwargs = {}
else:
poolclass = QueuePool
pool_kwargs = {
'pool_size': self.config.postgres_pool_size,
'max_overflow': self.config.postgres_max_overflow,
'pool_timeout': self.config.postgres_pool_timeout,
'pool_recycle': self.config.postgres_pool_recycle,
}
engine = create_engine(
self.config.postgres_url,
poolclass=poolclass,
echo=self.config.postgres_echo,
echo_pool=self.config.postgres_echo_pool,
pool_pre_ping=True, # Verify connections before using
**pool_kwargs
)
logger.info(f"PostgreSQL engine created with pool size: {self.config.postgres_pool_size}")
return engine
def get_session(self) -> Session:
"""Get a new database session."""
return self.session_factory()
def healthcheck(self) -> bool:
"""Check database health."""
try:
with self.engine.connect() as conn:
result = conn.execute("SELECT 1")
return result.scalar() == 1
except Exception as e:
logger.error(f"PostgreSQL healthcheck failed: {e}")
return False
def close(self):
"""Close all database connections."""
if self._engine:
self._engine.dispose()
logger.info("PostgreSQL connections closed")
class MongoDBManager:
"""Manages MongoDB connections."""
def __init__(self, config: DatabaseConfig):
self.config = config
self._client: Optional[MongoClient] = None
self._database = None
@property
def client(self) -> MongoClient:
"""Get or create MongoDB client."""
if self._client is None:
self._client = self._create_client()
return self._client
@property
def database(self):
"""Get MongoDB database."""
if self._database is None:
self._database = self.client[self.config.mongodb_database]
return self._database
def _create_client(self) -> MongoClient:
"""Create MongoDB client with connection pooling."""
client = MongoClient(
self.config.mongodb_url,
maxPoolSize=self.config.mongodb_max_pool_size,
minPoolSize=self.config.mongodb_min_pool_size,
maxIdleTimeMS=self.config.mongodb_max_idle_time,
serverSelectionTimeoutMS=self.config.mongodb_server_selection_timeout,
retryWrites=True,
retryReads=True
)
# Verify connection
client.admin.command('ping')
logger.info(f"MongoDB client created for database: {self.config.mongodb_database}")
return client
def get_collection(self, name: str):
"""Get a MongoDB collection."""
return self.database[name]
def healthcheck(self) -> bool:
"""Check MongoDB health."""
try:
self.client.admin.command('ping')
return True
except Exception as e:
logger.error(f"MongoDB healthcheck failed: {e}")
return False
def close(self):
"""Close MongoDB connection."""
if self._client:
self._client.close()
logger.info("MongoDB connection closed")
class DatabaseManager:
"""Central database manager for all database connections."""
def __init__(self, config: DatabaseConfig):
self.config = config
self.postgres = PostgreSQLManager(config)
self.mongodb = MongoDBManager(config)
def initialize(self):
"""Initialize all database connections."""
logger.info("Initializing database connections...")
# Initialize PostgreSQL
if self.postgres.healthcheck():
logger.info("PostgreSQL connection established")
else:
logger.error("Failed to connect to PostgreSQL")
# Initialize MongoDB
if self.mongodb.healthcheck():
logger.info("MongoDB connection established")
else:
logger.error("Failed to connect to MongoDB")
def healthcheck(self) -> Dict[str, bool]:
"""Check health of all databases."""
return {
'postgresql': self.postgres.healthcheck(),
'mongodb': self.mongodb.healthcheck()
}
def close_all(self):
"""Close all database connections."""
self.postgres.close()
self.mongodb.close()
logger.info("All database connections closed")
def get_stats(self) -> Dict[str, Any]:
"""Get database statistics."""
stats = {}
# PostgreSQL stats
if self.postgres._engine:
pool = self.postgres.engine.pool
stats['postgresql'] = {
'size': pool.size() if hasattr(pool, 'size') else None,
'checked_in': pool.checkedin() if hasattr(pool, 'checkedin') else None,
'overflow': pool.overflow() if hasattr(pool, 'overflow') else None,
'total': pool.total() if hasattr(pool, 'total') else None
}
# MongoDB stats
if self.mongodb._client:
stats['mongodb'] = self.mongodb.client.server_info()
return stats
def create_database_config(settings: Any) -> DatabaseConfig:
"""
Create database configuration from settings.
Args:
settings: Application settings
Returns:
DatabaseConfig instance
"""
return DatabaseConfig(
postgres_url=settings.database_url,
postgres_pool_size=settings.database_pool_size,
postgres_max_overflow=settings.database_max_overflow,
postgres_echo=settings.database_echo,
mongodb_url=settings.mongodb_url,
mongodb_database=settings.mongodb_database
)
# Global database manager instance
db_manager: Optional[DatabaseManager] = None
def initialize_databases(settings: Any):
"""
Initialize global database manager.
Args:
settings: Application settings
"""
global db_manager
config = create_database_config(settings)
db_manager = DatabaseManager(config)
db_manager.initialize()
return db_manager
def get_db_manager() -> DatabaseManager:
"""Get global database manager instance."""
if db_manager is None:
raise RuntimeError("Database manager not initialized")
return db_manager