""" Database Initialization Module Handles database creation, schema setup, and health checks """ import os import aiosqlite import logging from pathlib import Path from typing import Dict, Any, List logger = logging.getLogger(__name__) class DatabaseInitializer: """Handles database initialization and health checks""" def __init__(self, db_path: str = None): self.db_path = db_path or os.getenv("DATABASE_PATH", "./ai_tutor.db") self.schema_path = self._find_schema_file() def _find_schema_file(self) -> str: """Return the path to the schema.sql file. The schema.sql file is expected to be in the same directory as this script. """ schema_path = os.path.join(os.path.dirname(__file__), 'schema.sql') if not os.path.exists(schema_path): raise FileNotFoundError(f"schema.sql not found at {schema_path}") return schema_path async def check_database_exists(self) -> bool: """Check if database file exists""" return os.path.exists(self.db_path) async def check_database_health(self) -> Dict[str, Any]: """Comprehensive database health check""" health_status = { "database_exists": False, "database_accessible": False, "schema_loaded": False, "tables_exist": False, "views_exist": False, "can_write": False, "record_count": {}, "errors": [] } try: # Check if database file exists health_status["database_exists"] = await self.check_database_exists() if not health_status["database_exists"]: health_status["errors"].append("Database file does not exist") return health_status # Try to connect to database async with aiosqlite.connect(self.db_path) as db: health_status["database_accessible"] = True # Check if required tables exist required_tables = ['metadata_extractions', 'curricula', 'learning_content', 'api_cache'] existing_tables = await self._get_existing_tables(db) missing_tables = [table for table in required_tables if table not in existing_tables] if missing_tables: health_status["errors"].append(f"Missing tables: {missing_tables}") else: health_status["tables_exist"] = True # Check if views exist required_views = ['user_learning_journeys', 'curriculum_content_status'] existing_views = await self._get_existing_views(db) missing_views = [view for view in required_views if view not in existing_views] if missing_views: health_status["errors"].append(f"Missing views: {missing_views}") else: health_status["views_exist"] = True # Test write capability try: await db.execute("CREATE TEMPORARY TABLE test_write (id INTEGER)") await db.execute("DROP TABLE test_write") health_status["can_write"] = True except Exception as e: health_status["errors"].append(f"Cannot write to database: {str(e)}") # Get record counts if health_status["tables_exist"]: for table in required_tables: try: async with db.execute(f"SELECT COUNT(*) FROM {table}") as cursor: count = await cursor.fetchone() health_status["record_count"][table] = count[0] if count else 0 except Exception as e: health_status["record_count"][table] = f"Error: {str(e)}" health_status["schema_loaded"] = ( health_status["tables_exist"] and health_status["views_exist"] ) except Exception as e: health_status["errors"].append(f"Database connection error: {str(e)}") return health_status async def _get_existing_tables(self, db: aiosqlite.Connection) -> List[str]: """Get list of existing tables""" async with db.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' """) as cursor: rows = await cursor.fetchall() return [row[0] for row in rows] async def _get_existing_views(self, db: aiosqlite.Connection) -> List[str]: """Get list of existing views""" async with db.execute(""" SELECT name FROM sqlite_master WHERE type='view' """) as cursor: rows = await cursor.fetchall() return [row[0] for row in rows] async def create_database(self) -> bool: """Create database file and initialize with schema""" try: logger.info(f"Creating database at: {self.db_path}") # Ensure directory exists db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) logger.info(f"Created directory: {db_dir}") # Create database and load schema async with aiosqlite.connect(self.db_path) as db: # Read schema file with open(self.schema_path, 'r') as f: schema = f.read() # Execute schema await db.executescript(schema) await db.commit() logger.info("Database created and schema loaded successfully") return True except Exception as e: logger.error(f"Error creating database: {str(e)}") return False async def initialize_database(self, force_recreate: bool = False) -> Dict[str, Any]: """Initialize database with comprehensive checks and creation""" result = { "success": False, "action_taken": "none", "health_check": {}, "errors": [] } try: # Check current database health health_check = await self.check_database_health() result["health_check"] = health_check # Determine if we need to create/recreate database needs_creation = ( not health_check["database_exists"] or not health_check["schema_loaded"] or force_recreate ) if needs_creation: if health_check["database_exists"] and force_recreate: # Backup existing database backup_path = f"{self.db_path}.backup" if os.path.exists(self.db_path): os.rename(self.db_path, backup_path) logger.info(f"Backed up existing database to: {backup_path}") result["action_taken"] = "recreated_with_backup" else: result["action_taken"] = "force_recreated" else: result["action_taken"] = "created" # Create database creation_success = await self.create_database() if not creation_success: result["errors"].append("Failed to create database") return result # Verify creation final_health = await self.check_database_health() result["health_check"] = final_health if final_health["schema_loaded"] and final_health["can_write"]: result["success"] = True logger.info("Database initialization completed successfully") else: result["errors"].append("Database created but health check failed") else: # Database exists and is healthy result["success"] = True result["action_taken"] = "already_exists" logger.info("Database already exists and is healthy") except Exception as e: error_msg = f"Database initialization error: {str(e)}" logger.error(error_msg) result["errors"].append(error_msg) return result async def repair_database(self) -> Dict[str, Any]: """Attempt to repair database issues""" result = { "success": False, "repairs_attempted": [], "errors": [] } try: health_check = await self.check_database_health() if not health_check["database_exists"]: # Database doesn't exist - create it creation_result = await self.initialize_database() result["repairs_attempted"].append("created_missing_database") result["success"] = creation_result["success"] result["errors"].extend(creation_result.get("errors", [])) return result # Database exists but has issues async with aiosqlite.connect(self.db_path) as db: # Check and repair missing tables if not health_check["tables_exist"]: with open(self.schema_path, 'r') as f: schema = f.read() await db.executescript(schema) await db.commit() result["repairs_attempted"].append("recreated_schema") # Verify repair final_health = await self.check_database_health() result["success"] = final_health["schema_loaded"] except Exception as e: error_msg = f"Database repair error: {str(e)}" logger.error(error_msg) result["errors"].append(error_msg) return result # Global instance db_initializer = DatabaseInitializer()