from flask import Blueprint, render_template, jsonify, send_from_directory, abort, session, request, redirect, url_for, current_app from pathlib import Path import pandas as pd import logging from functools import wraps from language_config import get_all_languages from filelock import FileLock # Remove if not needed import threading from database_manager import ( engine, assign_recording, complete_assignment, table_exists # Add this import ) from sqlalchemy import text import os import shutil logger = logging.getLogger(__name__) # Update Blueprint with explicit url_prefix validation = Blueprint('validation', __name__, url_prefix='/validation') def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session.get('user'): return jsonify({'error': 'Authentication required', 'code': 'AUTH_REQUIRED'}), 401 return f(*args, **kwargs) return decorated_function @validation.route('/') @login_required def validate(): try: # Get user from session user = session.get('user', {}) # Update user role from PocketBase only when accessing validation interface try: pb_user = current_app.pb.collection('users').get_one(user['id']) # Update session with latest role user['role'] = getattr(pb_user, 'role', user.get('role', '')) user['is_moderator'] = user['role'] in ['moderator', 'admin'] session['user'] = user except Exception as e: logger.error(f"Error updating user role: {e}") # Continue with existing session data if PocketBase update fails pass # Check moderator access with updated session data if not user.get('is_moderator', False): return render_template('error.html', error_code=403, error_message="You don't have permission to access this page"), 403 languages = get_all_languages() return render_template('validate.html', languages=languages) except Exception as e: logger.error(f"Error in validate route: {e}") return render_template('error.html', error_code=500, error_message="An internal server error occurred"), 500 def ensure_language_tables(conn, language): """Ensure both recordings and transcriptions tables exist for the language""" # Check if tables exist tables_query = text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = :table_name ) """) # Check recordings table recordings_exists = conn.execute(tables_query, {"table_name": f"recordings_{language}"}).scalar() # Check transcriptions table transcriptions_exists = conn.execute(tables_query, {"table_name": f"transcriptions_{language}"}).scalar() # Create missing tables if not recordings_exists or not transcriptions_exists: logger.info(f"Creating missing tables for language: {language}") if not recordings_exists: conn.execute(text(f""" CREATE TABLE IF NOT EXISTS recordings_{language} ( id SERIAL PRIMARY KEY, user_id VARCHAR, audio_filename VARCHAR, transcription_id INTEGER, speaker_name VARCHAR, speaker_id VARCHAR, audio_path VARCHAR, sampling_rate INTEGER, duration FLOAT, language VARCHAR(2), gender VARCHAR(10), country VARCHAR, state VARCHAR, city VARCHAR, status VARCHAR(20) DEFAULT 'pending', verified_by VARCHAR, username VARCHAR, age_group VARCHAR, accent VARCHAR, domain VARCHAR(10), subdomain VARCHAR(10) ) """)) if not transcriptions_exists: conn.execute(text(f""" CREATE TABLE IF NOT EXISTS transcriptions_{language} ( transcription_id SERIAL PRIMARY KEY, user_id VARCHAR(255), transcription_text TEXT NOT NULL, uploaded_at TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP ) """)) conn.commit() else: # Check if domain and subdomain columns exist in recordings table domain_exists = conn.execute(text(f""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = 'recordings_{language}' AND column_name = 'domain' ) """)).scalar() if not domain_exists: # Add domain and subdomain columns if they don't exist conn.execute(text(f""" ALTER TABLE recordings_{language} ADD COLUMN domain VARCHAR(10), ADD COLUMN subdomain VARCHAR(10) """)) conn.commit() logger.info(f"Added domain and subdomain columns to recordings_{language}") @validation.route('/api/recordings', methods=['GET']) @login_required def get_recordings(): if not session.get('user', {}).get('is_moderator', False): return jsonify({'error': 'Unauthorized'}), 403 page = request.args.get('page', 1, type=int) language = request.args.get('language', '') status = request.args.get('status', 'all') domain = request.args.get('domain', '') subdomain = request.args.get('subdomain', '') try: offset = (page - 1) * 10 query = """ SELECT r.*, COALESCE(t.transcription_text, '') as transcription FROM recordings_{} r LEFT JOIN transcriptions_{} t ON r.transcription_id = t.transcription_id """ count_query = """ SELECT COUNT(*) FROM recordings_{} WHERE {} """ where_conditions = [] count_conditions = [] if status == 'verified': where_conditions.append("r.status = 'verified'") count_conditions.append("status = 'verified'") elif status == 'pending': where_conditions.append("r.status = 'pending'") count_conditions.append("status = 'pending'") elif status == 'rejected': where_conditions.append("r.status = 'rejected'") count_conditions.append("status = 'rejected'") if domain: where_conditions.append("r.domain = :domain") count_conditions.append("domain = :domain") if subdomain: where_conditions.append("r.subdomain = :subdomain") count_conditions.append("subdomain = :subdomain") where_clause = f" WHERE {' AND '.join(where_conditions)}" if where_conditions else "" count_where = f"{' AND '.join(count_conditions)}" if count_conditions else "true" query += where_clause + " ORDER BY r.id LIMIT 10 OFFSET :offset" with engine.connect() as conn: if language: ensure_language_tables(conn, language) full_query = text(query.format(language, language)) params = {"offset": offset} if domain: params["domain"] = domain if subdomain: params["subdomain"] = subdomain result = conn.execute(full_query, params) count_params = {} if domain: count_params["domain"] = domain if subdomain: count_params["subdomain"] = subdomain total_result = conn.execute(text( count_query.format(language, count_where) ), count_params) total = total_result.scalar() else: all_recordings = [] total = 0 tables_query = text(""" SELECT table_name FROM information_schema.tables WHERE table_name LIKE 'recordings_%' """) tables = conn.execute(tables_query).fetchall() for (table_name,) in tables: language_code = table_name.replace('recordings_', '') ensure_language_tables(conn, language_code) language_query = text(query.format(language_code, language_code)) params = {"offset": offset} if domain: params["domain"] = domain if subdomain: params["subdomain"] = subdomain recordings = conn.execute(language_query, params).fetchall() count_params = {} if domain: count_params["domain"] = domain if subdomain: count_params["subdomain"] = subdomain count_result = conn.execute(text( count_query.format(language_code, count_where) ), count_params) total += count_result.scalar() or 0 all_recordings.sort(key=lambda x: x.id) result = all_recordings[offset:offset+10] recordings = [] for row in result: recording_data = dict(row._mapping) if 'audio_path' not in recording_data or not recording_data['audio_path']: recording_data['audio_path'] = f"{recording_data.get('language', 'unknown')}/audio/{recording_data.get('audio_filename', '')}" recordings.append(recording_data) return jsonify({ 'recordings': recordings, 'total': total, 'domain': domain, 'subdomain': subdomain }) except Exception as e: logger.error(f"Error getting recordings: {str(e)}") return jsonify({'error': str(e)}), 500 @validation.route('/api/verify/', methods=['POST']) @login_required def verify_recording(recording_id): if not session.get('user', {}).get('is_moderator', False): return jsonify({'error': 'Unauthorized'}), 403 try: data = request.get_json() verify = data.get('verify', False) parts = recording_id.split('/') if len(parts) < 3: return jsonify({'error': 'Invalid recording ID format'}), 400 language = parts[0] filename = parts[-1] with engine.begin() as conn: result = conn.execute(text(f""" UPDATE recordings_{language} SET status = :status, verified_by = :verified_by WHERE audio_filename = :filename RETURNING id """), { "status": 'verified' if verify else 'rejected', "verified_by": session['user']['id'], "filename": filename }) rec_id = result.scalar() if not rec_id: return jsonify({'error': 'Recording not found'}), 404 conn.execute(text(""" UPDATE validation_assignments SET status = :status WHERE assigned_to = :user_id AND recording_id = :rec_id AND language = :language """), { "status": 'completed_verified' if verify else 'completed_rejected', "user_id": session['user']['id'], "rec_id": rec_id, "language": language }) return jsonify({'status': 'success'}) except Exception as e: logger.error(f"Error verifying recording: {str(e)}") return jsonify({'error': str(e)}), 500 @validation.route('/api/audio/') @login_required def serve_audio(filename): try: audio_path = Path('datasets') / filename if not audio_path.exists(): raise FileNotFoundError(f"Audio file not found: {filename}") directory = str(audio_path.parent) file_name = audio_path.name return send_from_directory(directory, file_name, as_attachment=False) except Exception as e: logger.error(f"Error serving audio: {e}") abort(404) @validation.route('/api/delete/', methods=['DELETE']) @login_required def delete_recording(recording_id): if not session.get('user', {}).get('is_moderator', False): return jsonify({'error': 'Unauthorized'}), 403 try: parts = recording_id.split('/') if len(parts) < 3: return jsonify({'error': 'Invalid recording ID format'}), 400 language = parts[0] filename = parts[-1] with engine.connect() as conn: verify_query = text(f""" SELECT status FROM recordings_{language} WHERE audio_filename = :filename """) result = conn.execute(verify_query, {"filename": filename}).first() if not result: return jsonify({'error': 'Recording not found'}), 404 if result.status == 'verified': return jsonify({ 'error': 'Cannot delete verified recording. Please unverify first.' }), 400 with engine.begin() as conn: query = text(f""" SELECT * FROM recordings_{language} WHERE audio_filename = :filename """) recording = conn.execute(query, {"filename": filename}).first() if not recording: return jsonify({'error': 'Recording not found'}), 404 delete_query = text(f""" DELETE FROM recordings_{language} WHERE audio_filename = :filename """) conn.execute(delete_query, {"filename": filename}) audio_path = Path('datasets') / recording_id if audio_path.exists(): try: os.remove(audio_path) audio_dir = audio_path.parent if not any(audio_dir.iterdir()): shutil.rmtree(audio_dir) except Exception as e: logger.error(f"Error deleting audio file: {e}") return jsonify({'status': 'success', 'message': 'Recording deleted successfully'}) except Exception as e: logger.error(f"Error deleting recording: {str(e)}") return jsonify({'error': str(e)}), 500 @validation.route('/api/next_recording', methods=['GET']) @login_required def get_next_recording(): if not session.get('user', {}).get('is_moderator', False): return jsonify({'error': 'Unauthorized'}), 403 language = request.args.get('language', '') domain = request.args.get('domain', '') subdomain = request.args.get('subdomain', '') if not language: return jsonify({'error': 'Language is required'}), 400 try: with engine.connect() as conn: if not table_exists(conn, f"recordings_{language}"): return jsonify({ 'status': 'no_recordings', 'message': 'No recordings available for validation' }) # This now uses the imported assign_recording from database_manager.py # which properly handles domain/subdomain filtering recording = assign_recording(language, session['user']['id'], domain, subdomain) if not recording: return jsonify({ 'status': 'no_recordings', 'message': 'No recordings available for validation' }) return jsonify({ 'status': 'success', 'recording': dict(recording) }) except Exception as e: logger.error(f"Error getting next recording: {str(e)}") return jsonify({ 'status': 'no_recordings', 'message': 'No recordings available for validation' })