import os import json import tempfile import zipfile import asyncio from typing import Optional, List, Dict, Any, Union import pyarrow.parquet as pq import fastapi from fastapi import HTTPException, Request from fastapi.responses import FileResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from starlette.background import BackgroundTask from language_config import get_language_name, LANGUAGES class TempFileManager: _temp_dirs: Dict[str, tempfile.TemporaryDirectory] = {} @classmethod def create_temp_dir(cls, key: str) -> str: temp_dir = tempfile.TemporaryDirectory() cls._temp_dirs[key] = temp_dir return temp_dir.name @classmethod async def cleanup_temp_dir(cls, key: str) -> None: if key in cls._temp_dirs: temp_dir = cls._temp_dirs.pop(key) await asyncio.to_thread(temp_dir.cleanup) class AudioDownloadService: BASE_DATASET_DIR = "datasets" @classmethod def get_metadata_file_path(cls, language: str) -> str: """Get the path to the metadata Parquet file for a given language.""" return os.path.join(cls.BASE_DATASET_DIR, language, f"{language}.parquet") @classmethod def get_audio_file_path(cls, language: str, audio_filename: str) -> str: """Get the full path to an audio file.""" return os.path.join(cls.BASE_DATASET_DIR, language, "audio", audio_filename) @classmethod def read_parquet_metadata(cls, language: str, filters: Optional[List[tuple]] = None) -> List[Dict[str, Any]]: """ Read metadata from Parquet file with optional filtering. :param language: Language code :param filters: Optional list of filter tuples for Parquet filtering :return: List of metadata dictionaries """ file_path = cls.get_metadata_file_path(language) # Validate language if language not in LANGUAGES: raise HTTPException(status_code=400, detail=f"Invalid language code: {language}") # Check file exists if not os.path.exists(file_path): raise HTTPException(status_code=404, detail=f"Metadata file not found for language: {language}") # Read Parquet file table = pq.read_table(file_path, filters=filters) return table.to_pylist() # Create templates and static directories if they don't exist os.makedirs("templates", exist_ok=True) os.makedirs("static", exist_ok=True) app = fastapi.FastAPI( title="Audio Dataset Download API", description="API for downloading audio recordings and metadata" ) # Mount static files directory app.mount("/static", StaticFiles(directory="static"), name="static") # Setup Jinja2 templates templates = Jinja2Templates(directory="templates") @app.get("/", response_class=HTMLResponse) async def get_download_interface(request: Request): """Serve the download interface page.""" return templates.TemplateResponse( "download.html", {"request": request, "languages": LANGUAGES} ) @app.get("/stats") async def get_stats( # User and Speaker Identification user_id: Optional[str] = None, username: Optional[str] = None, speaker_id: Optional[str] = None, speaker_name: Optional[str] = None, # Audio File Metadata audio_filename: Optional[str] = None, audio_path: Optional[str] = None, sampling_rate: Optional[str] = None, duration_min: Optional[str] = None, duration_max: Optional[str] = None, # Language and Demographic Filters language: Optional[str] = None, gender: Optional[str] = None, country: Optional[str] = None, state: Optional[str] = None, city: Optional[str] = None, age_group: Optional[str] = None, accent: Optional[str] = None, # Verification and Timestamp verified: Optional[bool] = None, timestamp_min: Optional[str] = None, timestamp_max: Optional[str] = None, # Misc max_recordings: Optional[int] = 100 ): """Get statistics about the matching recordings without downloading.""" # Reuse filter logic from bulk_download filters = [] # Add filters for each parameter if user_id: filters.append(('user_id', '=', user_id)) if username: filters.append(('username', '=', username)) if speaker_id: filters.append(('speaker_id', '=', speaker_id)) if speaker_name: filters.append(('speaker_name', '=', speaker_name)) if audio_filename: filters.append(('audio_filename', '=', audio_filename)) if audio_path: filters.append(('audio_path', '=', audio_path)) if sampling_rate: filters.append(('sampling_rate', '=', sampling_rate)) # Convert duration strings to float if valid duration_min_val = float(duration_min) if duration_min and duration_min.strip() else None duration_max_val = float(duration_max) if duration_max and duration_max.strip() else None # Append filters if duration_min_val is not None: filters.append(('duration', '>=', duration_min_val)) if duration_max_val is not None: filters.append(('duration', '<=', duration_max_val)) # Language and Demographic Filters if language and language not in LANGUAGES: raise HTTPException(status_code=400, detail=f"Invalid language code: {language}") if language: filters.append(('language', '=', language)) if gender: filters.append(('gender', '=', gender)) if country: filters.append(('country', '=', country)) if state: filters.append(('state', '=', state)) if city: filters.append(('city', '=', city)) if age_group: filters.append(('age_group', '=', age_group)) if accent: filters.append(('accent', '=', accent)) # Verification and Timestamp Filters if verified is not None: filters.append(('verified', '=', verified)) # Timestamp range filter (if needed - depends on timestamp format) if timestamp_min: filters.append(('timestamp', '>=', timestamp_min)) if timestamp_max: filters.append(('timestamp', '<=', timestamp_max)) # Determine languages to search languages_to_search = [language] if language else list(LANGUAGES.keys()) # Collect stats total_recordings = 0 total_duration = 0 languages_found = set() genders = {} age_groups = {} accents = {} for lang in languages_to_search: try: # Extend filters with language if not already specified lang_filters = filters.copy() if not any(f[0] == 'language' for f in lang_filters): lang_filters.append(('language', '=', lang)) # Read metadata recordings = AudioDownloadService.read_parquet_metadata(lang, lang_filters) if recordings: languages_found.add(lang) total_recordings += len(recordings) for rec in recordings: total_duration += rec.get('duration', 0) genders[rec.get('gender', 'Unknown')] = genders.get(rec.get('gender', 'Unknown'), 0) + 1 age_groups[rec.get('age_group', 'Unknown')] = age_groups.get(rec.get('age_group', 'Unknown'), 0) + 1 accents[rec.get('accent', 'Unknown')] = accents.get(rec.get('accent', 'Unknown'), 0) + 1 except HTTPException: continue return { "total_recordings": total_recordings, "total_duration_hours": round(total_duration / 3600, 2), "languages": list(languages_found), "language_count": len(languages_found), "gender_distribution": genders, "age_distribution": age_groups, "accent_distribution": accents } @app.get("/download") async def bulk_download( # User and Speaker Identification user_id: Optional[str] = None, username: Optional[str] = None, speaker_id: Optional[str] = None, speaker_name: Optional[str] = None, # Audio File Metadata audio_filename: Optional[str] = None, audio_path: Optional[str] = None, sampling_rate: Optional[str] = None, duration_min: Optional[str] = None, duration_max: Optional[str] = None, # Language and Demographic Filters language: Optional[str] = None, gender: Optional[str] = None, country: Optional[str] = None, state: Optional[str] = None, city: Optional[str] = None, age_group: Optional[str] = None, accent: Optional[str] = None, # Verification and Timestamp verified: Optional[bool] = None, timestamp_min: Optional[str] = None, timestamp_max: Optional[str] = None, # Misc max_recordings: Optional[int] = 100 ): """ Bulk download audio files and metadata based on comprehensive filtering criteria. """ # Prepare filters filters = [] # Add filters for each parameter if user_id: filters.append(('user_id', '=', user_id)) if username: filters.append(('username', '=', username)) if speaker_id: filters.append(('speaker_id', '=', speaker_id)) if speaker_name: filters.append(('speaker_name', '=', speaker_name)) if audio_filename: filters.append(('audio_filename', '=', audio_filename)) if audio_path: filters.append(('audio_path', '=', audio_path)) if sampling_rate: filters.append(('sampling_rate', '=', sampling_rate)) # Convert duration strings to float if valid duration_min_val = float(duration_min) if duration_min and duration_min.strip() else None duration_max_val = float(duration_max) if duration_max and duration_max.strip() else None # Append filters if duration_min_val is not None: filters.append(('duration', '>=', duration_min_val)) if duration_max_val is not None: filters.append(('duration', '<=', duration_max_val)) # Language and Demographic Filters if language and language not in LANGUAGES: raise HTTPException(status_code=400, detail=f"Invalid language code: {language}") if language: filters.append(('language', '=', language)) if gender: filters.append(('gender', '=', gender)) if country: filters.append(('country', '=', country)) if state: filters.append(('state', '=', state)) if city: filters.append(('city', '=', city)) if age_group: filters.append(('age_group', '=', age_group)) if accent: filters.append(('accent', '=', accent)) # Verification and Timestamp Filters if verified is not None: filters.append(('verified', '=', verified)) # Timestamp range filter (if needed - depends on timestamp format) if timestamp_min: filters.append(('timestamp', '>=', timestamp_min)) if timestamp_max: filters.append(('timestamp', '<=', timestamp_max)) # Determine languages to search languages_to_search = [language] if language else list(LANGUAGES.keys()) # Collect metadata and audio files matched_recordings = [] for lang in languages_to_search: try: # Extend filters with language if not already specified lang_filters = filters.copy() if not any(f[0] == 'language' for f in lang_filters): lang_filters.append(('language', '=', lang)) # Read metadata recordings = AudioDownloadService.read_parquet_metadata(lang, lang_filters) # Limit recordings recordings = recordings[:max_recordings] matched_recordings.extend(recordings) # Break if max recordings reached if len(matched_recordings) >= max_recordings: matched_recordings = matched_recordings[:max_recordings] break except HTTPException: # Skip languages without metadata continue # Check if any recordings found if not matched_recordings: raise HTTPException(status_code=404, detail="No recordings match the given criteria") # Generate unique key for this request request_id = os.urandom(16).hex() # Create temporary directory tmpdirname = TempFileManager.create_temp_dir(request_id) # Prepare zip file zip_path = os.path.join(tmpdirname, "dataset.zip") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # Create metadata file in zip metadata_path = os.path.join(tmpdirname, "metadata.jsonl") with open(metadata_path, 'w', encoding='utf-8') as f: for record in matched_recordings: f.write(json.dumps(record, ensure_ascii=False) + "\n") zipf.write(metadata_path, "metadata.jsonl") # Add audio files to zip for record in matched_recordings: audio_filename = record['audio_filename'] language = record['language'] full_audio_path = AudioDownloadService.get_audio_file_path(language, audio_filename) if os.path.exists(full_audio_path): zipf.write(full_audio_path, os.path.join(language, "audio", audio_filename)) # Verify zip file exists if not os.path.exists(zip_path): raise HTTPException(status_code=500, detail="Failed to create zip file") # Return zip file with async cleanup return FileResponse( path=zip_path, media_type="application/zip", filename="dataset.zip", background=BackgroundTask(TempFileManager.cleanup_temp_dir, request_id) ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)