|
from flask import Blueprint, render_template, jsonify, send_from_directory, abort, session, request, redirect, url_for, current_app
|
|
from pathlib import Path
|
|
import pandas as pd
|
|
import logging
|
|
from functools import wraps
|
|
from language_config import get_all_languages
|
|
from filelock import FileLock
|
|
import threading
|
|
from database_manager import (
|
|
engine,
|
|
assign_recording,
|
|
complete_assignment,
|
|
table_exists
|
|
)
|
|
from sqlalchemy import text
|
|
import os
|
|
import shutil
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
validation = Blueprint('validation', __name__, url_prefix='/validation')
|
|
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not session.get('user'):
|
|
return jsonify({'error': 'Authentication required', 'code': 'AUTH_REQUIRED'}), 401
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@validation.route('/')
|
|
@login_required
|
|
def validate():
|
|
try:
|
|
|
|
user = session.get('user', {})
|
|
|
|
|
|
try:
|
|
pb_user = current_app.pb.collection('users').get_one(user['id'])
|
|
|
|
user['role'] = getattr(pb_user, 'role', user.get('role', ''))
|
|
user['is_moderator'] = user['role'] in ['moderator', 'admin']
|
|
session['user'] = user
|
|
except Exception as e:
|
|
logger.error(f"Error updating user role: {e}")
|
|
|
|
pass
|
|
|
|
|
|
if not user.get('is_moderator', False):
|
|
return render_template('error.html',
|
|
error_code=403,
|
|
error_message="You don't have permission to access this page"), 403
|
|
|
|
languages = get_all_languages()
|
|
return render_template('validate.html', languages=languages)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in validate route: {e}")
|
|
return render_template('error.html',
|
|
error_code=500,
|
|
error_message="An internal server error occurred"), 500
|
|
|
|
def ensure_language_tables(conn, language):
|
|
"""Ensure both recordings and transcriptions tables exist for the language"""
|
|
|
|
tables_query = text("""
|
|
SELECT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE table_name = :table_name
|
|
)
|
|
""")
|
|
|
|
|
|
recordings_exists = conn.execute(tables_query,
|
|
{"table_name": f"recordings_{language}"}).scalar()
|
|
|
|
|
|
transcriptions_exists = conn.execute(tables_query,
|
|
{"table_name": f"transcriptions_{language}"}).scalar()
|
|
|
|
|
|
if not recordings_exists or not transcriptions_exists:
|
|
logger.info(f"Creating missing tables for language: {language}")
|
|
|
|
if not recordings_exists:
|
|
conn.execute(text(f"""
|
|
CREATE TABLE IF NOT EXISTS recordings_{language} (
|
|
id SERIAL PRIMARY KEY,
|
|
user_id VARCHAR,
|
|
audio_filename VARCHAR,
|
|
transcription_id INTEGER,
|
|
speaker_name VARCHAR,
|
|
speaker_id VARCHAR,
|
|
audio_path VARCHAR,
|
|
sampling_rate INTEGER,
|
|
duration FLOAT,
|
|
language VARCHAR(2),
|
|
gender VARCHAR(10),
|
|
country VARCHAR,
|
|
state VARCHAR,
|
|
city VARCHAR,
|
|
status VARCHAR(20) DEFAULT 'pending',
|
|
verified_by VARCHAR,
|
|
username VARCHAR,
|
|
age_group VARCHAR,
|
|
accent VARCHAR,
|
|
domain VARCHAR(10),
|
|
subdomain VARCHAR(10)
|
|
)
|
|
"""))
|
|
|
|
if not transcriptions_exists:
|
|
conn.execute(text(f"""
|
|
CREATE TABLE IF NOT EXISTS transcriptions_{language} (
|
|
transcription_id SERIAL PRIMARY KEY,
|
|
user_id VARCHAR(255),
|
|
transcription_text TEXT NOT NULL,
|
|
uploaded_at TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
conn.commit()
|
|
else:
|
|
|
|
domain_exists = conn.execute(text(f"""
|
|
SELECT EXISTS (
|
|
SELECT FROM information_schema.columns
|
|
WHERE table_name = 'recordings_{language}'
|
|
AND column_name = 'domain'
|
|
)
|
|
""")).scalar()
|
|
|
|
if not domain_exists:
|
|
|
|
conn.execute(text(f"""
|
|
ALTER TABLE recordings_{language}
|
|
ADD COLUMN domain VARCHAR(10),
|
|
ADD COLUMN subdomain VARCHAR(10)
|
|
"""))
|
|
conn.commit()
|
|
logger.info(f"Added domain and subdomain columns to recordings_{language}")
|
|
|
|
@validation.route('/api/recordings', methods=['GET'])
|
|
@login_required
|
|
def get_recordings():
|
|
if not session.get('user', {}).get('is_moderator', False):
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
page = request.args.get('page', 1, type=int)
|
|
language = request.args.get('language', '')
|
|
status = request.args.get('status', 'all')
|
|
domain = request.args.get('domain', '')
|
|
subdomain = request.args.get('subdomain', '')
|
|
|
|
try:
|
|
offset = (page - 1) * 10
|
|
|
|
query = """
|
|
SELECT r.*, COALESCE(t.transcription_text, '') as transcription
|
|
FROM recordings_{} r
|
|
LEFT JOIN transcriptions_{} t ON r.transcription_id = t.transcription_id
|
|
"""
|
|
|
|
count_query = """
|
|
SELECT COUNT(*) FROM recordings_{} WHERE {}
|
|
"""
|
|
|
|
where_conditions = []
|
|
count_conditions = []
|
|
|
|
if status == 'verified':
|
|
where_conditions.append("r.status = 'verified'")
|
|
count_conditions.append("status = 'verified'")
|
|
elif status == 'pending':
|
|
where_conditions.append("r.status = 'pending'")
|
|
count_conditions.append("status = 'pending'")
|
|
elif status == 'rejected':
|
|
where_conditions.append("r.status = 'rejected'")
|
|
count_conditions.append("status = 'rejected'")
|
|
|
|
if domain:
|
|
where_conditions.append("r.domain = :domain")
|
|
count_conditions.append("domain = :domain")
|
|
|
|
if subdomain:
|
|
where_conditions.append("r.subdomain = :subdomain")
|
|
count_conditions.append("subdomain = :subdomain")
|
|
|
|
where_clause = f" WHERE {' AND '.join(where_conditions)}" if where_conditions else ""
|
|
count_where = f"{' AND '.join(count_conditions)}" if count_conditions else "true"
|
|
|
|
query += where_clause + " ORDER BY r.id LIMIT 10 OFFSET :offset"
|
|
|
|
with engine.connect() as conn:
|
|
if language:
|
|
ensure_language_tables(conn, language)
|
|
|
|
full_query = text(query.format(language, language))
|
|
params = {"offset": offset}
|
|
|
|
if domain:
|
|
params["domain"] = domain
|
|
if subdomain:
|
|
params["subdomain"] = subdomain
|
|
|
|
result = conn.execute(full_query, params)
|
|
|
|
count_params = {}
|
|
if domain:
|
|
count_params["domain"] = domain
|
|
if subdomain:
|
|
count_params["subdomain"] = subdomain
|
|
|
|
total_result = conn.execute(text(
|
|
count_query.format(language, count_where)
|
|
), count_params)
|
|
|
|
total = total_result.scalar()
|
|
else:
|
|
all_recordings = []
|
|
total = 0
|
|
|
|
tables_query = text("""
|
|
SELECT table_name
|
|
FROM information_schema.tables
|
|
WHERE table_name LIKE 'recordings_%'
|
|
""")
|
|
tables = conn.execute(tables_query).fetchall()
|
|
|
|
for (table_name,) in tables:
|
|
language_code = table_name.replace('recordings_', '')
|
|
ensure_language_tables(conn, language_code)
|
|
language_query = text(query.format(language_code, language_code))
|
|
|
|
params = {"offset": offset}
|
|
if domain:
|
|
params["domain"] = domain
|
|
if subdomain:
|
|
params["subdomain"] = subdomain
|
|
|
|
recordings = conn.execute(language_query, params).fetchall()
|
|
|
|
count_params = {}
|
|
if domain:
|
|
count_params["domain"] = domain
|
|
if subdomain:
|
|
count_params["subdomain"] = subdomain
|
|
|
|
count_result = conn.execute(text(
|
|
count_query.format(language_code, count_where)
|
|
), count_params)
|
|
|
|
total += count_result.scalar() or 0
|
|
|
|
all_recordings.sort(key=lambda x: x.id)
|
|
result = all_recordings[offset:offset+10]
|
|
|
|
recordings = []
|
|
for row in result:
|
|
recording_data = dict(row._mapping)
|
|
if 'audio_path' not in recording_data or not recording_data['audio_path']:
|
|
recording_data['audio_path'] = f"{recording_data.get('language', 'unknown')}/audio/{recording_data.get('audio_filename', '')}"
|
|
recordings.append(recording_data)
|
|
|
|
return jsonify({
|
|
'recordings': recordings,
|
|
'total': total,
|
|
'domain': domain,
|
|
'subdomain': subdomain
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting recordings: {str(e)}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@validation.route('/api/verify/<path:recording_id>', methods=['POST'])
|
|
@login_required
|
|
def verify_recording(recording_id):
|
|
if not session.get('user', {}).get('is_moderator', False):
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
try:
|
|
data = request.get_json()
|
|
verify = data.get('verify', False)
|
|
|
|
parts = recording_id.split('/')
|
|
if len(parts) < 3:
|
|
return jsonify({'error': 'Invalid recording ID format'}), 400
|
|
|
|
language = parts[0]
|
|
filename = parts[-1]
|
|
|
|
|
|
with engine.begin() as conn:
|
|
result = conn.execute(text(f"""
|
|
UPDATE recordings_{language}
|
|
SET status = :status,
|
|
verified_by = :verified_by
|
|
WHERE audio_filename = :filename
|
|
RETURNING id
|
|
"""), {
|
|
"status": 'verified' if verify else 'rejected',
|
|
"verified_by": session['user']['id'],
|
|
"filename": filename
|
|
})
|
|
|
|
rec_id = result.scalar()
|
|
if not rec_id:
|
|
return jsonify({'error': 'Recording not found'}), 404
|
|
|
|
conn.execute(text("""
|
|
UPDATE validation_assignments
|
|
SET status = :status
|
|
WHERE assigned_to = :user_id
|
|
AND recording_id = :rec_id
|
|
AND language = :language
|
|
"""), {
|
|
"status": 'completed_verified' if verify else 'completed_rejected',
|
|
"user_id": session['user']['id'],
|
|
"rec_id": rec_id,
|
|
"language": language
|
|
})
|
|
|
|
return jsonify({'status': 'success'})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error verifying recording: {str(e)}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@validation.route('/api/audio/<path:filename>')
|
|
@login_required
|
|
def serve_audio(filename):
|
|
try:
|
|
audio_path = Path('datasets') / filename
|
|
if not audio_path.exists():
|
|
raise FileNotFoundError(f"Audio file not found: {filename}")
|
|
directory = str(audio_path.parent)
|
|
file_name = audio_path.name
|
|
return send_from_directory(directory, file_name, as_attachment=False)
|
|
except Exception as e:
|
|
logger.error(f"Error serving audio: {e}")
|
|
abort(404)
|
|
|
|
@validation.route('/api/delete/<path:recording_id>', methods=['DELETE'])
|
|
@login_required
|
|
def delete_recording(recording_id):
|
|
if not session.get('user', {}).get('is_moderator', False):
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
try:
|
|
parts = recording_id.split('/')
|
|
if len(parts) < 3:
|
|
return jsonify({'error': 'Invalid recording ID format'}), 400
|
|
|
|
language = parts[0]
|
|
filename = parts[-1]
|
|
|
|
with engine.connect() as conn:
|
|
verify_query = text(f"""
|
|
SELECT status FROM recordings_{language}
|
|
WHERE audio_filename = :filename
|
|
""")
|
|
result = conn.execute(verify_query, {"filename": filename}).first()
|
|
|
|
if not result:
|
|
return jsonify({'error': 'Recording not found'}), 404
|
|
|
|
if result.status == 'verified':
|
|
return jsonify({
|
|
'error': 'Cannot delete verified recording. Please unverify first.'
|
|
}), 400
|
|
|
|
with engine.begin() as conn:
|
|
query = text(f"""
|
|
SELECT * FROM recordings_{language}
|
|
WHERE audio_filename = :filename
|
|
""")
|
|
recording = conn.execute(query, {"filename": filename}).first()
|
|
|
|
if not recording:
|
|
return jsonify({'error': 'Recording not found'}), 404
|
|
|
|
delete_query = text(f"""
|
|
DELETE FROM recordings_{language}
|
|
WHERE audio_filename = :filename
|
|
""")
|
|
conn.execute(delete_query, {"filename": filename})
|
|
|
|
audio_path = Path('datasets') / recording_id
|
|
if audio_path.exists():
|
|
try:
|
|
os.remove(audio_path)
|
|
audio_dir = audio_path.parent
|
|
if not any(audio_dir.iterdir()):
|
|
shutil.rmtree(audio_dir)
|
|
except Exception as e:
|
|
logger.error(f"Error deleting audio file: {e}")
|
|
|
|
return jsonify({'status': 'success', 'message': 'Recording deleted successfully'})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting recording: {str(e)}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@validation.route('/api/next_recording', methods=['GET'])
|
|
@login_required
|
|
def get_next_recording():
|
|
if not session.get('user', {}).get('is_moderator', False):
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
language = request.args.get('language', '')
|
|
domain = request.args.get('domain', '')
|
|
subdomain = request.args.get('subdomain', '')
|
|
|
|
if not language:
|
|
return jsonify({'error': 'Language is required'}), 400
|
|
|
|
try:
|
|
with engine.connect() as conn:
|
|
if not table_exists(conn, f"recordings_{language}"):
|
|
return jsonify({
|
|
'status': 'no_recordings',
|
|
'message': 'No recordings available for validation'
|
|
})
|
|
|
|
|
|
|
|
recording = assign_recording(language, session['user']['id'], domain, subdomain)
|
|
|
|
if not recording:
|
|
return jsonify({
|
|
'status': 'no_recordings',
|
|
'message': 'No recordings available for validation'
|
|
})
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'recording': dict(recording)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting next recording: {str(e)}")
|
|
return jsonify({
|
|
'status': 'no_recordings',
|
|
'message': 'No recordings available for validation'
|
|
})
|
|
|