from dotenv import load_dotenv load_dotenv() import os import logging from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, Boolean, Text, TIMESTAMP, func, select, text from sqlalchemy.exc import SQLAlchemyError from flask import current_app from domain_subdomain import domains_and_subdomains # Import domain data logger = logging.getLogger(__name__) DATABASE_URL = os.getenv("POSTGRES_URL") if not DATABASE_URL: raise Exception("POSTGRES_URL environment variable not set") engine = create_engine(DATABASE_URL) metadata_db = MetaData() # Removed bind parameter tables_cache = {} _domain_tables_verified = False def get_language_table(language): """Get or create language-specific recordings table""" table_name = f"recordings_{language}" if table_name not in tables_cache: tables_cache[table_name] = Table( table_name, metadata_db, Column('id', Integer, primary_key=True, autoincrement=True), Column('user_id', String), Column('audio_filename', String), Column('transcription_id', Integer), # Reference to transcriptions table Column('speaker_name', String), Column('speaker_id', String), # Added this column Column('audio_path', String), Column('sampling_rate', Integer), Column('duration', Float), Column('language', String(2)), Column('gender', String(10)), Column('country', String), Column('state', String), Column('city', String), Column('status', String(20), default='pending'), # Replace verified boolean with status Column('verified_by', String, nullable=True), Column('username', String), Column('age_group', String), Column('accent', String), Column('domain', String(10)), # Add domain column Column('subdomain', String(10)), # Add subdomain column extend_existing=True ) try: metadata_db.create_all(engine, tables=[tables_cache[table_name]]) logger.info(f"Created table: {table_name}") except Exception as e: logger.error(f"Error creating table {table_name}: {e}") raise return tables_cache[table_name] def store_metadata(metadata_dict): """Store recording metadata in appropriate language table""" try: language = metadata_dict.get('language') if not language: raise ValueError("Language is required in metadata") # Create both tables if they don't exist with engine.connect() as conn: # Create recordings table first recordings_table = get_language_table(language) # Create table in database if it doesn't exist try: metadata_db.create_all(bind=conn, tables=[recordings_table]) logger.info(f"Ensured table exists: recordings_{language}") except Exception as e: logger.warning(f"Table creation warning (may already exist): {e}") # Ensure transcription table exists ensure_transcription_table(conn, language) # Check if domain and subdomain columns exist in recordings table domain_exists = conn.execute(text(f""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = 'recordings_{language}' AND column_name = 'domain' ) """)).scalar() if not domain_exists: # Add domain and subdomain columns conn.execute(text(f""" ALTER TABLE recordings_{language} ADD COLUMN domain VARCHAR(10), ADD COLUMN subdomain VARCHAR(10) """)) conn.commit() logger.info(f"Added domain and subdomain columns to recordings_{language}") # Remove any fields that don't match the table columns valid_columns = [c.name for c in recordings_table.columns] cleaned_metadata = { k: (v if v != '' else None) for k, v in metadata_dict.items() if k in valid_columns } cleaned_metadata['status'] = 'pending' # Set default status logger.debug(f"Storing metadata with columns: {list(cleaned_metadata.keys())}") # Insert the metadata result = conn.execute(recordings_table.insert().values(**cleaned_metadata)) conn.commit() logger.info(f"Successfully stored metadata for recording: {cleaned_metadata.get('audio_filename')}") return result.inserted_primary_key[0] except Exception as e: logger.error(f"Error in store_metadata: {str(e)}", exc_info=True) raise def store_transcription(transcription_text, language): """Store transcription in language-specific table""" try: # Create language-specific transcriptions table if it doesn't exist with engine.connect() as conn: create_table_query = text(f""" CREATE TABLE IF NOT EXISTS transcriptions_{language} ( transcription_id SERIAL PRIMARY KEY, user_id VARCHAR(255), transcription_text TEXT NOT NULL, recorded BOOLEAN DEFAULT false, domain VARCHAR(10), subdomain VARCHAR(10), uploaded_at TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(create_table_query) # Insert transcription insert_query = text(f""" INSERT INTO transcriptions_{language} (transcription_text) VALUES (:transcription_text) RETURNING transcription_id """) result = conn.execute(insert_query, {"transcription_text": transcription_text}) conn.commit() return result.scalar() except Exception as e: logger.error(f"Error storing transcription: {e}") raise Exception(f"Database error: {str(e)}") def get_available_languages(): """Get list of languages that have transcriptions available""" try: with engine.connect() as conn: # Look for tables matching pattern 'transcriptions_*' result = conn.execute(text(""" SELECT DISTINCT table_name FROM information_schema.tables WHERE table_name LIKE 'transcriptions_%' """)) # Extract language codes from table names languages = [ table_name.replace('transcriptions_', '') for (table_name,) in result ] logger.debug(f"Found languages in DB: {languages}") return languages except Exception as e: logger.error(f"Database error in get_available_languages: {str(e)}") raise Exception(f"Database error: {str(e)}") def ensure_transcription_table(conn, language): """Ensure transcription table exists with correct schema""" try: # Check if table exists table_exists = conn.execute(text(f""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'transcriptions_{language}' ) """)).scalar() if table_exists: # Check if recorded column exists column_exists = conn.execute(text(f""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = 'transcriptions_{language}' AND column_name = 'recorded' ) """)).scalar() if not column_exists: # Add recorded column conn.execute(text(f""" ALTER TABLE transcriptions_{language} ADD COLUMN recorded BOOLEAN DEFAULT false """)) conn.commit() logger.info(f"Added recorded column to transcriptions_{language}") # Check if domain column exists domain_exists = conn.execute(text(f""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = 'transcriptions_{language}' AND column_name = 'domain' ) """)).scalar() if not domain_exists: # Add domain and subdomain columns conn.execute(text(f""" ALTER TABLE transcriptions_{language} ADD COLUMN domain VARCHAR(10), ADD COLUMN subdomain VARCHAR(10) """)) conn.commit() logger.info(f"Added domain and subdomain columns to transcriptions_{language}") else: # Create new table with all columns conn.execute(text(f""" CREATE TABLE transcriptions_{language} ( transcription_id SERIAL PRIMARY KEY, user_id VARCHAR(255), transcription_text TEXT NOT NULL, recorded BOOLEAN DEFAULT false, domain VARCHAR(10), subdomain VARCHAR(10), uploaded_at TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP ) """)) conn.commit() logger.info(f"Created table: transcriptions_{language}") except Exception as e: logger.error(f"Error ensuring transcription table: {e}") raise def get_transcriptions_for_language(language_code, include_recorded=False, limit=None, offset=0, exclude_ids=None, count_only=False, ids_only=False, specific_ids=None, domain=None, subdomain=None): """ Get transcriptions for a language with various filtering options Args: language_code (str): Language code to fetch transcriptions for include_recorded (bool): Whether to include already recorded transcriptions limit (int, optional): Maximum number of transcriptions to return offset (int, optional): Offset for pagination exclude_ids (list, optional): IDs to exclude from results count_only (bool): Return only the count of matching transcriptions ids_only (bool): Return only the transcript IDs without content specific_ids (list, optional): Specific transcript IDs to fetch domain (str, optional): Filter by specific domain subdomain (str, optional): Filter by specific subdomain """ try: with engine.connect() as conn: ensure_transcription_table(conn, language_code) # If count only, just return the count if count_only: where_clauses = [] if not include_recorded: where_clauses.append("recorded = false") # Add domain and subdomain filters if provided if domain: where_clauses.append("domain = :domain") if subdomain: where_clauses.append("subdomain = :subdomain") where_clause = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" count_query = text(f""" SELECT COUNT(*) FROM transcriptions_{language_code} {where_clause} """) params = {} if domain: params['domain'] = domain if subdomain: params['subdomain'] = subdomain count = conn.execute(count_query, params).scalar() or 0 return {'count': count} # Build the SELECT clause based on ids_only select_clause = "SELECT transcription_id" if ids_only else "SELECT transcription_id, transcription_text, recorded, domain, subdomain" # Build the WHERE clause where_clauses = [] if not include_recorded: where_clauses.append("recorded = false") # Add domain and subdomain filters if provided if domain: where_clauses.append("domain = :domain") if subdomain: where_clauses.append("subdomain = :subdomain") if exclude_ids and len(exclude_ids) > 0: placeholders = ','.join([f':exclude_id_{i}' for i in range(len(exclude_ids))]) where_clauses.append(f"transcription_id NOT IN ({placeholders})") if specific_ids and len(specific_ids) > 0: placeholders = ','.join([f':specific_id_{i}' for i in range(len(specific_ids))]) where_clauses.append(f"transcription_id IN ({placeholders})") where_clause = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" # Build pagination clause limit_clause = f"LIMIT :limit" if limit is not None else "" offset_clause = f"OFFSET :offset" if offset > 0 else "" # Build query query = text(f""" {select_clause} FROM transcriptions_{language_code} {where_clause} ORDER BY transcription_id {limit_clause} {offset_clause} """) # Build params dict params = {} if limit is not None: params['limit'] = limit if offset > 0: params['offset'] = offset if domain: params['domain'] = domain if subdomain: params['subdomain'] = subdomain # Add exclude IDs to params if exclude_ids and len(exclude_ids) > 0: for i, id_val in enumerate(exclude_ids): params[f'exclude_id_{i}'] = id_val # Add specific IDs to params if specific_ids and len(specific_ids) > 0: for i, id_val in enumerate(specific_ids): params[f'specific_id_{i}'] = id_val result = conn.execute(query, params) # Handle IDs-only response if ids_only: return [row[0] for row in result] # Handle normal response transcriptions = [ {"id": row[0], "text": row[1], "recorded": row[2], "domain": row[3], "subdomain": row[4]} for row in result ] if not transcriptions and not ids_only: cond = "unrecorded " if not include_recorded else "" domain_info = f" for domain '{domain}'" if domain else "" subdomain_info = f" and subdomain '{subdomain}'" if subdomain else "" logger.warning(f"No {cond}transcriptions found for language: {language_code}{domain_info}{subdomain_info}") return [] return transcriptions except Exception as e: logger.error(f"Database error in get_transcriptions_for_language: {str(e)}") raise def table_exists(conn, table_name): """Check if a table exists in the database""" exists_query = text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = :table_name ) """) result = conn.execute(exists_query, {"table_name": table_name}) return result.scalar() def get_dataset_stats(): """Get dataset statistics from PostgreSQL""" try: with engine.connect() as conn: stats = { 'total_recordings': 0, 'total_verified': 0, 'languages': {}, 'total_duration': 0, # This will be in seconds 'total_users': 0, 'total_languages': 0, 'total_transcripts': 0 # Added total transcripts } # Get total transcripts count from all language-specific tables transcripts_query = text(""" SELECT table_name FROM information_schema.tables WHERE table_name LIKE 'transcriptions_%' """) transcript_tables = conn.execute(transcripts_query).fetchall() total_transcripts = 0 for (table_name,) in transcript_tables: count_query = text(f"SELECT COUNT(*) FROM {table_name}") count = conn.execute(count_query).scalar() or 0 total_transcripts += count stats['total_transcripts'] = total_transcripts # Get available languages and their tables languages = get_available_languages() if not languages: logger.warning("No language tables found") return stats # First create missing tables for lang in languages: table_name = f"recordings_{lang}" if not table_exists(conn, table_name): logger.info(f"Creating table for language: {lang}") get_language_table(lang) # This will create the table if it doesn't exist # Build UNION query for both stats and user count query_parts = [] existing_tables = [] for lang in languages: table_name = f"recordings_{lang}" if table_exists(conn, table_name): query_parts.append(f""" SELECT status, duration, user_id, '{lang}' as language FROM {table_name} """) existing_tables.append((table_name, lang)) if query_parts: # Build and execute the full stats query union_query = " UNION ALL ".join(query_parts) stats_query = text(f""" SELECT COUNT(*) as total, SUM(CASE WHEN status = 'verified' THEN 1 ELSE 0 END) as verified, SUM(duration) as total_duration, COUNT(DISTINCT user_id) as total_users FROM ({union_query}) all_recordings """) result = conn.execute(stats_query) row = result.fetchone() if row: stats['total_recordings'] = row[0] stats['total_verified'] = row[1] stats['total_duration'] = float(row[2] or 0) # Duration in seconds stats['total_users'] = row[3] # Get unique users across all recordings # Get per-language statistics including transcripts count for table_name, lang in existing_tables: try: # Get recordings stats result = conn.execute(text(f""" SELECT COUNT(*) as total, COUNT(DISTINCT user_id) as total_users, SUM(CASE WHEN status = 'verified' THEN 1 ELSE 0 END) as verified, SUM(duration) as duration FROM {table_name} """)) row = result.fetchone() # Get transcripts count for this language trans_result = conn.execute(text(f""" SELECT COUNT(*) FROM transcriptions_{lang} """)) transcripts_count = trans_result.scalar() or 0 if row: stats['languages'][lang] = { 'recordings': row[0], 'total_users': row[1], 'verified': row[2], 'total_duration': float(row[3] or 0), 'available_transcripts': transcripts_count # Added transcripts count } except Exception as e: logger.warning(f"Could not get stats for {table_name}: {e}") stats['languages'][lang] = { 'recordings': 0, 'total_users': 0, 'verified': 0, 'total_duration': 0.0, 'available_transcripts': 0 } continue stats['total_languages'] = len(stats['languages']) return stats except Exception as e: logger.error(f"Error getting dataset stats: {e}") return { 'total_recordings': 0, 'total_verified': 0, 'languages': {}, 'total_duration': 0, 'total_users': 0, 'total_languages': 0, 'total_transcripts': 0 } def create_assignments_table(conn): """Create a single assignments table with one row per user""" conn.execute(text(""" DROP TABLE IF EXISTS validation_assignments; CREATE TABLE validation_assignments ( id SERIAL PRIMARY KEY, assigned_to VARCHAR(255) NOT NULL, -- Changed from user_id to match column name language VARCHAR(2) NOT NULL, recording_id INTEGER NOT NULL, assigned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL, status VARCHAR(20) DEFAULT 'pending', UNIQUE (assigned_to) -- Ensure only one row per user ) """)) conn.commit() def cleanup_completed_assignments(): """Remove completed assignments from the table""" try: with engine.connect() as conn: conn.execute(text(""" DELETE FROM validation_assignments WHERE status IN ('completed_verified', 'completed_rejected') """)) conn.commit() except Exception as e: logger.error(f"Error cleaning up assignments: {e}") def assign_recording(language, moderator_id, domain='', subdomain=''): """Get next unassigned recording or update existing assignment with domain/subdomain filtering""" try: with engine.connect() as conn: # First check if recordings table exists if not table_exists(conn, f"recordings_{language}"): logger.info(f"No recordings table exists for language: {language}") return None # Check if there are any pending recordings with these filters filters_check_query = text(f""" SELECT COUNT(*) FROM recordings_{language} WHERE status = 'pending' {" AND domain = :domain" if domain else ""} {" AND subdomain = :subdomain" if subdomain else ""} """) matching_recordings = conn.execute(filters_check_query, { "domain": domain, "subdomain": subdomain }).scalar() if matching_recordings == 0: domain_info = f" for domain '{domain}'" if domain else "" subdomain_info = f" and subdomain '{subdomain}'" if subdomain else "" logger.info(f"No pending recordings found{domain_info}{subdomain_info}") return None # Create assignments table if it doesn't exist create_assignments_table(conn) # Clean up completed assignments periodically cleanup_completed_assignments() # Clear expired assignments conn.execute(text(""" UPDATE validation_assignments SET status = 'expired' WHERE expires_at < NOW() """)) # Check if user has an existing assignment existing = conn.execute(text(""" SELECT recording_id, language FROM validation_assignments WHERE assigned_to = :moderator_id """), { "moderator_id": moderator_id }).first() if existing and existing.language == language: # Check if existing assignment matches domain/subdomain filters match_query = text(f""" SELECT r.id FROM recordings_{language} r WHERE r.id = :recording_id AND (:domain = '' OR r.domain = :domain) AND (:subdomain = '' OR r.subdomain = :subdomain) """) matches_filters = conn.execute(match_query, { "recording_id": existing.recording_id, "domain": domain, "subdomain": subdomain }).scalar() is not None if matches_filters: # Return existing assignment as it matches the filters result = conn.execute(text(f""" SELECT r.*, t.transcription_text FROM recordings_{language} r LEFT JOIN transcriptions_{language} t ON r.transcription_id = t.transcription_id WHERE r.id = :recording_id """), { "recording_id": existing.recording_id }) return result.mappings().first() else: # Release the existing assignment as it doesn't match the filters logger.info(f"Releasing assignment for user {moderator_id} as it doesn't match new filters") conn.execute(text(""" DELETE FROM validation_assignments WHERE assigned_to = :moderator_id """), { "moderator_id": moderator_id }) conn.commit() # Make sure to commit the DELETE # Build domain and subdomain filter conditions domain_condition = "AND r.domain = :domain" if domain else "" subdomain_condition = "AND r.subdomain = :subdomain" if subdomain else "" # Log query parameters for debugging logger.debug(f"Looking for recordings with filters - domain: '{domain}', subdomain: '{subdomain}'") # Get next available recording with filters next_recording = conn.execute(text(f""" WITH assigned_recordings AS ( SELECT recording_id FROM validation_assignments WHERE status = 'pending' AND expires_at > NOW() ) SELECT r.id FROM recordings_{language} r WHERE r.status = 'pending' AND NOT EXISTS ( SELECT 1 FROM assigned_recordings ar WHERE ar.recording_id = r.id ) {domain_condition} {subdomain_condition} ORDER BY r.id LIMIT 1 FOR UPDATE SKIP LOCKED """), { "domain": domain, "subdomain": subdomain }).scalar() if not next_recording: domain_info = f" with domain '{domain}'" if domain else "" subdomain_info = f" and subdomain '{subdomain}'" if subdomain else "" logger.info(f"No unassigned recordings found{domain_info}{subdomain_info}") return None # Insert or update assignment conn.execute(text(""" INSERT INTO validation_assignments (assigned_to, language, recording_id, expires_at, status) VALUES (:moderator_id, :language, :recording_id, NOW() + INTERVAL '10 minutes', 'pending') ON CONFLICT (assigned_to) DO UPDATE SET language = EXCLUDED.language, recording_id = EXCLUDED.recording_id, expires_at = EXCLUDED.expires_at, status = 'pending' """), { "moderator_id": moderator_id, "language": language, "recording_id": next_recording }) # Get full recording data including transcription result = conn.execute(text(f""" SELECT r.*, t.transcription_text FROM recordings_{language} r LEFT JOIN transcriptions_{language} t ON r.transcription_id = t.transcription_id WHERE r.id = :recording_id """), { "recording_id": next_recording }) recording = result.mappings().first() conn.commit() return recording except Exception as e: logger.error(f"Error assigning recording: {e}") raise def complete_assignment(language, recording_id, moderator_id, status): """Mark an assignment as completed and remove it - optimized version""" try: with engine.begin() as conn: # Use begin() for automatic transaction management # Update and cleanup in a single transaction conn.execute(text(""" WITH completed AS ( UPDATE validation_assignments SET status = :status WHERE assigned_to = :moderator_id AND recording_id = :recording_id AND language = :language ) DELETE FROM validation_assignments WHERE status IN ('completed_verified', 'completed_rejected') """), { "status": status, "moderator_id": moderator_id, "recording_id": recording_id, "language": language }) except Exception as e: logger.error(f"Error completing assignment: {e}") raise def ensure_domain_tables(conn): """Ensure domain and subdomain tables exist in PostgreSQL.""" global _domain_tables_verified # Skip verification if tables were already verified if _domain_tables_verified: return True try: # Create domains table if it doesn't exist conn.execute(text(""" CREATE TABLE IF NOT EXISTS domains ( code VARCHAR(10) PRIMARY KEY, name VARCHAR(100) NOT NULL ) """)) # Create subdomains table if it doesn't exist conn.execute(text(""" CREATE TABLE IF NOT EXISTS subdomains ( id SERIAL PRIMARY KEY, mnemonic VARCHAR(10) NOT NULL, name VARCHAR(100) NOT NULL, domain_code VARCHAR(10) NOT NULL REFERENCES domains(code), UNIQUE(domain_code, mnemonic) ) """)) conn.commit() logger.info("Domain and subdomain tables created or verified") # Mark tables as verified _domain_tables_verified = True return True except Exception as e: logger.error(f"Error creating domain tables: {e}") conn.rollback() return False def initialize_domain_data(): """Initialize domain and subdomain tables with data from domain_subdomain.py""" try: with engine.connect() as conn: # Create the tables if they don't exist if not ensure_domain_tables(conn): logger.error("Failed to create domain tables") return False # Check if domains table is empty domain_count = conn.execute(text("SELECT COUNT(*) FROM domains")).scalar() if domain_count == 0: logger.info("Domain table is empty. Populating with predefined data...") # Insert domains for code, name in domains_and_subdomains["domains"].items(): conn.execute(text( "INSERT INTO domains (code, name) VALUES (:code, :name)" ), {"code": code, "name": name}) # Insert subdomains for domain_code, subdomains in domains_and_subdomains["subdomains"].items(): for subdomain in subdomains: conn.execute(text(""" INSERT INTO subdomains (mnemonic, name, domain_code) VALUES (:mnemonic, :name, :domain_code) """), { "mnemonic": subdomain["mnemonic"], "name": subdomain["name"], "domain_code": domain_code }) conn.commit() logger.info("Domain and subdomain data populated successfully") else: logger.debug("Domain data already exists in database") return True except Exception as e: logger.error(f"Error initializing domain data: {str(e)}") return False def get_all_domains_db(): """Get all domains from the database.""" try: with engine.connect() as conn: ensure_domain_tables(conn) result = conn.execute(text("SELECT code, name FROM domains")) domains = {row[0]: row[1] for row in result} # If no domains found in database, return predefined domains if not domains: logger.warning("No domains found in database, returning predefined domains") return domains_and_subdomains["domains"] return domains except Exception as e: logger.error(f"Error getting all domains: {e}") # Fallback to predefined domains return domains_and_subdomains["domains"] def get_domain_name_db(domain_code): """Get domain name from its code.""" try: with engine.connect() as conn: result = conn.execute(text( "SELECT name FROM domains WHERE code = :code" ), {"code": domain_code}) row = result.fetchone() if row: return row[0] # Fallback to predefined domains if not found in database if domain_code in domains_and_subdomains["domains"]: return domains_and_subdomains["domains"][domain_code] return None except Exception as e: logger.error(f"Error getting domain name: {e}") # Fallback to predefined domains return domains_and_subdomains["domains"].get(domain_code) def get_domain_subdomains_db(domain_code): """Get all subdomains for a specific domain.""" try: with engine.connect() as conn: # Try to get from database first result = conn.execute(text( "SELECT mnemonic, name FROM subdomains WHERE domain_code = :code" ), {"code": domain_code}) subdomains = [{"mnemonic": row[0], "name": row[1]} for row in result] # If nothing found in database, return predefined subdomains if not subdomains and domain_code in domains_and_subdomains["subdomains"]: return domains_and_subdomains["subdomains"][domain_code] return subdomains except Exception as e: logger.error(f"Error getting domain subdomains: {e}") # Fallback to predefined subdomains return domains_and_subdomains["subdomains"].get(domain_code, []) def get_subdomain_by_mnemonic_db(domain_code, subdomain_mnemonic): """Get subdomain information by its mnemonic within a domain.""" try: with engine.connect() as conn: result = conn.execute(text( "SELECT mnemonic, name FROM subdomains WHERE domain_code = :code AND mnemonic = :mnemonic" ), {"code": domain_code, "mnemonic": subdomain_mnemonic}) row = result.fetchone() if row: return {"mnemonic": row[0], "name": row[1]} # Fallback to predefined subdomains for subdomain in domains_and_subdomains["subdomains"].get(domain_code, []): if subdomain["mnemonic"] == subdomain_mnemonic: return subdomain return None except Exception as e: logger.error(f"Error getting subdomain by mnemonic: {e}") # Fallback to predefined subdomains for subdomain in domains_and_subdomains["subdomains"].get(domain_code, []): if subdomain["mnemonic"] == subdomain_mnemonic: return subdomain return None def search_subdomain_db(query, domain_code=None): """Search for a subdomain by name or mnemonic across all domains or a specific domain.""" try: with engine.connect() as conn: if (domain_code): result = conn.execute(text(""" SELECT s.mnemonic, s.name, s.domain_code, d.name FROM subdomains s JOIN domains d ON s.domain_code = d.code WHERE s.domain_code = :domain_code AND (LOWER(s.name) LIKE :query OR LOWER(s.mnemonic) LIKE :query) """), {"domain_code": domain_code, "query": f"%{query.lower()}%"}) else: result = conn.execute(text(""" SELECT s.mnemonic, s.name, s.domain_code, d.name FROM subdomains s JOIN domains d ON s.domain_code = d.code WHERE LOWER(s.name) LIKE :query OR LOWER(s.mnemonic) LIKE :query """), {"query": f"%{query.lower()}%"}) results = [] for row in result: results.append({ "domain": row[2], "domain_name": row[3], "subdomain": {"mnemonic": row[0], "name": row[1]} }) return results except Exception as e: logger.error(f"Error searching subdomains: {e}") return [] def get_available_domains(): """Get list of available domains from transcriptions in database""" try: with engine.connect() as conn: # Make sure domain tables exist ensure_domain_tables(conn) # First get a list of all transcription tables that actually exist existing_tables = conn.execute(text(""" SELECT table_name FROM information_schema.tables WHERE table_name LIKE 'transcriptions_%' AND table_schema = 'public' """)).fetchall() if not existing_tables: logger.warning("No transcription tables found in database") # Return all domains from domain_subdomain.py as a fallback return list(domains_and_subdomains["domains"].keys()) # Build the UNION query dynamically based on existing tables query_parts = [] for (table_name,) in existing_tables: # First check if the domain column exists in this table column_exists = conn.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = :table_name AND column_name = 'domain' ) """), {"table_name": table_name}).scalar() if column_exists: query_parts.append(f""" SELECT DISTINCT domain FROM {table_name} WHERE domain IS NOT NULL AND domain != '' """) if query_parts: # Execute the union query to get all distinct domains across all tables domains_query = text(" UNION ".join(query_parts)) domain_results = conn.execute(domains_query).fetchall() domains = [domain[0] for domain in domain_results if domain[0]] if domains: return domains logger.warning("No transcription tables with domain column found") # Return all domains from domain_subdomain.py as a fallback return list(domains_and_subdomains["domains"].keys()) except Exception as e: logger.error(f"Error getting available domains from transcriptions: {e}") # Return all domains from domain_subdomain.py as a fallback return list(domains_and_subdomains["domains"].keys()) def get_available_subdomains(domain_code): """Get list of available subdomains for a domain from transcriptions in database""" try: with engine.connect() as conn: # First check if any transcription tables exist and have the subdomain column existing_tables = conn.execute(text(""" SELECT table_name FROM information_schema.tables WHERE table_name LIKE 'transcriptions_%' AND table_schema = 'public' """)).fetchall() if not existing_tables: logger.warning("No transcription tables found in database") # Return all subdomains for this domain as fallback return [sd["mnemonic"] for sd in domains_and_subdomains["subdomains"].get(domain_code, [])] # Build the UNION query dynamically based on existing tables query_parts = [] for (table_name,) in existing_tables: # Check if both domain and subdomain columns exist columns_exist = conn.execute(text(""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = :table_name AND column_name = 'domain' ) AND EXISTS ( SELECT FROM information_schema.columns WHERE table_name = :table_name AND column_name = 'subdomain' ) """), {"table_name": table_name}).scalar() if columns_exist: query_parts.append(f""" SELECT DISTINCT subdomain FROM {table_name} WHERE domain = :domain_code AND subdomain IS NOT NULL AND subdomain != '' """) if query_parts: # Execute the union query with domain code parameter subdomains_query = text(" UNION ".join(query_parts)) subdomain_results = conn.execute(subdomains_query, {"domain_code": domain_code}).fetchall() subdomains = [subdomain[0] for subdomain in subdomain_results if subdomain[0]] if subdomains: return subdomains logger.warning(f"No transcription tables with domain and subdomain columns found") # Return all subdomains for this domain as fallback return [sd["mnemonic"] for sd in domains_and_subdomains["subdomains"].get(domain_code, [])] except Exception as e: logger.error(f"Error getting available subdomains from transcriptions: {e}") # Return all subdomains for this domain as fallback return [sd["mnemonic"] for sd in domains_and_subdomains["subdomains"].get(domain_code, [])] # Initialize domain data when module is loaded try: initialize_domain_data() except Exception as e: logger.warning(f"Could not initialize domain data: {e}. Will try again when needed.")