|
import os
|
|
import json
|
|
import tempfile
|
|
import zipfile
|
|
import asyncio
|
|
from typing import Optional, List, Dict, Any, Union
|
|
|
|
import pyarrow.parquet as pq
|
|
import fastapi
|
|
from fastapi import HTTPException, Request
|
|
from fastapi.responses import FileResponse, HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from starlette.background import BackgroundTask
|
|
|
|
from language_config import get_language_name, LANGUAGES
|
|
|
|
class TempFileManager:
|
|
_temp_dirs: Dict[str, tempfile.TemporaryDirectory] = {}
|
|
|
|
@classmethod
|
|
def create_temp_dir(cls, key: str) -> str:
|
|
temp_dir = tempfile.TemporaryDirectory()
|
|
cls._temp_dirs[key] = temp_dir
|
|
return temp_dir.name
|
|
|
|
@classmethod
|
|
async def cleanup_temp_dir(cls, key: str) -> None:
|
|
if key in cls._temp_dirs:
|
|
temp_dir = cls._temp_dirs.pop(key)
|
|
await asyncio.to_thread(temp_dir.cleanup)
|
|
|
|
class AudioDownloadService:
|
|
BASE_DATASET_DIR = "datasets"
|
|
|
|
@classmethod
|
|
def get_metadata_file_path(cls, language: str) -> str:
|
|
"""Get the path to the metadata Parquet file for a given language."""
|
|
return os.path.join(cls.BASE_DATASET_DIR, language, f"{language}.parquet")
|
|
|
|
@classmethod
|
|
def get_audio_file_path(cls, language: str, audio_filename: str) -> str:
|
|
"""Get the full path to an audio file."""
|
|
return os.path.join(cls.BASE_DATASET_DIR, language, "audio", audio_filename)
|
|
|
|
@classmethod
|
|
def read_parquet_metadata(cls, language: str,
|
|
filters: Optional[List[tuple]] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Read metadata from Parquet file with optional filtering.
|
|
|
|
:param language: Language code
|
|
:param filters: Optional list of filter tuples for Parquet filtering
|
|
:return: List of metadata dictionaries
|
|
"""
|
|
file_path = cls.get_metadata_file_path(language)
|
|
|
|
|
|
if language not in LANGUAGES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid language code: {language}")
|
|
|
|
|
|
if not os.path.exists(file_path):
|
|
raise HTTPException(status_code=404, detail=f"Metadata file not found for language: {language}")
|
|
|
|
|
|
table = pq.read_table(file_path, filters=filters)
|
|
return table.to_pylist()
|
|
|
|
|
|
os.makedirs("templates", exist_ok=True)
|
|
os.makedirs("static", exist_ok=True)
|
|
|
|
app = fastapi.FastAPI(
|
|
title="Audio Dataset Download API",
|
|
description="API for downloading audio recordings and metadata"
|
|
)
|
|
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def get_download_interface(request: Request):
|
|
"""Serve the download interface page."""
|
|
return templates.TemplateResponse(
|
|
"download.html",
|
|
{"request": request, "languages": LANGUAGES}
|
|
)
|
|
|
|
@app.get("/stats")
|
|
async def get_stats(
|
|
|
|
user_id: Optional[str] = None,
|
|
username: Optional[str] = None,
|
|
speaker_id: Optional[str] = None,
|
|
speaker_name: Optional[str] = None,
|
|
|
|
|
|
audio_filename: Optional[str] = None,
|
|
audio_path: Optional[str] = None,
|
|
sampling_rate: Optional[str] = None,
|
|
duration_min: Optional[str] = None,
|
|
duration_max: Optional[str] = None,
|
|
|
|
|
|
language: Optional[str] = None,
|
|
gender: Optional[str] = None,
|
|
country: Optional[str] = None,
|
|
state: Optional[str] = None,
|
|
city: Optional[str] = None,
|
|
age_group: Optional[str] = None,
|
|
accent: Optional[str] = None,
|
|
|
|
|
|
verified: Optional[bool] = None,
|
|
timestamp_min: Optional[str] = None,
|
|
timestamp_max: Optional[str] = None,
|
|
|
|
|
|
max_recordings: Optional[int] = 100
|
|
):
|
|
"""Get statistics about the matching recordings without downloading."""
|
|
|
|
filters = []
|
|
|
|
|
|
if user_id:
|
|
filters.append(('user_id', '=', user_id))
|
|
if username:
|
|
filters.append(('username', '=', username))
|
|
if speaker_id:
|
|
filters.append(('speaker_id', '=', speaker_id))
|
|
if speaker_name:
|
|
filters.append(('speaker_name', '=', speaker_name))
|
|
|
|
if audio_filename:
|
|
filters.append(('audio_filename', '=', audio_filename))
|
|
if audio_path:
|
|
filters.append(('audio_path', '=', audio_path))
|
|
if sampling_rate:
|
|
filters.append(('sampling_rate', '=', sampling_rate))
|
|
|
|
|
|
duration_min_val = float(duration_min) if duration_min and duration_min.strip() else None
|
|
duration_max_val = float(duration_max) if duration_max and duration_max.strip() else None
|
|
|
|
|
|
if duration_min_val is not None:
|
|
filters.append(('duration', '>=', duration_min_val))
|
|
if duration_max_val is not None:
|
|
filters.append(('duration', '<=', duration_max_val))
|
|
|
|
|
|
if language and language not in LANGUAGES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid language code: {language}")
|
|
if language:
|
|
filters.append(('language', '=', language))
|
|
|
|
if gender:
|
|
filters.append(('gender', '=', gender))
|
|
if country:
|
|
filters.append(('country', '=', country))
|
|
if state:
|
|
filters.append(('state', '=', state))
|
|
if city:
|
|
filters.append(('city', '=', city))
|
|
if age_group:
|
|
filters.append(('age_group', '=', age_group))
|
|
if accent:
|
|
filters.append(('accent', '=', accent))
|
|
|
|
|
|
if verified is not None:
|
|
filters.append(('verified', '=', verified))
|
|
|
|
|
|
if timestamp_min:
|
|
filters.append(('timestamp', '>=', timestamp_min))
|
|
if timestamp_max:
|
|
filters.append(('timestamp', '<=', timestamp_max))
|
|
|
|
|
|
languages_to_search = [language] if language else list(LANGUAGES.keys())
|
|
|
|
|
|
total_recordings = 0
|
|
total_duration = 0
|
|
languages_found = set()
|
|
genders = {}
|
|
age_groups = {}
|
|
accents = {}
|
|
|
|
for lang in languages_to_search:
|
|
try:
|
|
|
|
lang_filters = filters.copy()
|
|
if not any(f[0] == 'language' for f in lang_filters):
|
|
lang_filters.append(('language', '=', lang))
|
|
|
|
|
|
recordings = AudioDownloadService.read_parquet_metadata(lang, lang_filters)
|
|
if recordings:
|
|
languages_found.add(lang)
|
|
total_recordings += len(recordings)
|
|
|
|
for rec in recordings:
|
|
total_duration += rec.get('duration', 0)
|
|
genders[rec.get('gender', 'Unknown')] = genders.get(rec.get('gender', 'Unknown'), 0) + 1
|
|
age_groups[rec.get('age_group', 'Unknown')] = age_groups.get(rec.get('age_group', 'Unknown'), 0) + 1
|
|
accents[rec.get('accent', 'Unknown')] = accents.get(rec.get('accent', 'Unknown'), 0) + 1
|
|
|
|
except HTTPException:
|
|
continue
|
|
|
|
return {
|
|
"total_recordings": total_recordings,
|
|
"total_duration_hours": round(total_duration / 3600, 2),
|
|
"languages": list(languages_found),
|
|
"language_count": len(languages_found),
|
|
"gender_distribution": genders,
|
|
"age_distribution": age_groups,
|
|
"accent_distribution": accents
|
|
}
|
|
|
|
@app.get("/download")
|
|
async def bulk_download(
|
|
|
|
user_id: Optional[str] = None,
|
|
username: Optional[str] = None,
|
|
speaker_id: Optional[str] = None,
|
|
speaker_name: Optional[str] = None,
|
|
|
|
|
|
audio_filename: Optional[str] = None,
|
|
audio_path: Optional[str] = None,
|
|
sampling_rate: Optional[str] = None,
|
|
duration_min: Optional[str] = None,
|
|
duration_max: Optional[str] = None,
|
|
|
|
|
|
language: Optional[str] = None,
|
|
gender: Optional[str] = None,
|
|
country: Optional[str] = None,
|
|
state: Optional[str] = None,
|
|
city: Optional[str] = None,
|
|
age_group: Optional[str] = None,
|
|
accent: Optional[str] = None,
|
|
|
|
|
|
verified: Optional[bool] = None,
|
|
timestamp_min: Optional[str] = None,
|
|
timestamp_max: Optional[str] = None,
|
|
|
|
|
|
max_recordings: Optional[int] = 100
|
|
):
|
|
"""
|
|
Bulk download audio files and metadata based on comprehensive filtering criteria.
|
|
"""
|
|
|
|
filters = []
|
|
|
|
|
|
if user_id:
|
|
filters.append(('user_id', '=', user_id))
|
|
if username:
|
|
filters.append(('username', '=', username))
|
|
if speaker_id:
|
|
filters.append(('speaker_id', '=', speaker_id))
|
|
if speaker_name:
|
|
filters.append(('speaker_name', '=', speaker_name))
|
|
|
|
if audio_filename:
|
|
filters.append(('audio_filename', '=', audio_filename))
|
|
if audio_path:
|
|
filters.append(('audio_path', '=', audio_path))
|
|
if sampling_rate:
|
|
filters.append(('sampling_rate', '=', sampling_rate))
|
|
|
|
|
|
duration_min_val = float(duration_min) if duration_min and duration_min.strip() else None
|
|
duration_max_val = float(duration_max) if duration_max and duration_max.strip() else None
|
|
|
|
|
|
if duration_min_val is not None:
|
|
filters.append(('duration', '>=', duration_min_val))
|
|
if duration_max_val is not None:
|
|
filters.append(('duration', '<=', duration_max_val))
|
|
|
|
|
|
if language and language not in LANGUAGES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid language code: {language}")
|
|
if language:
|
|
filters.append(('language', '=', language))
|
|
|
|
if gender:
|
|
filters.append(('gender', '=', gender))
|
|
if country:
|
|
filters.append(('country', '=', country))
|
|
if state:
|
|
filters.append(('state', '=', state))
|
|
if city:
|
|
filters.append(('city', '=', city))
|
|
if age_group:
|
|
filters.append(('age_group', '=', age_group))
|
|
if accent:
|
|
filters.append(('accent', '=', accent))
|
|
|
|
|
|
if verified is not None:
|
|
filters.append(('verified', '=', verified))
|
|
|
|
|
|
if timestamp_min:
|
|
filters.append(('timestamp', '>=', timestamp_min))
|
|
if timestamp_max:
|
|
filters.append(('timestamp', '<=', timestamp_max))
|
|
|
|
|
|
languages_to_search = [language] if language else list(LANGUAGES.keys())
|
|
|
|
|
|
matched_recordings = []
|
|
for lang in languages_to_search:
|
|
try:
|
|
|
|
lang_filters = filters.copy()
|
|
if not any(f[0] == 'language' for f in lang_filters):
|
|
lang_filters.append(('language', '=', lang))
|
|
|
|
|
|
recordings = AudioDownloadService.read_parquet_metadata(lang, lang_filters)
|
|
|
|
|
|
recordings = recordings[:max_recordings]
|
|
matched_recordings.extend(recordings)
|
|
|
|
|
|
if len(matched_recordings) >= max_recordings:
|
|
matched_recordings = matched_recordings[:max_recordings]
|
|
break
|
|
except HTTPException:
|
|
|
|
continue
|
|
|
|
|
|
if not matched_recordings:
|
|
raise HTTPException(status_code=404, detail="No recordings match the given criteria")
|
|
|
|
|
|
request_id = os.urandom(16).hex()
|
|
|
|
|
|
tmpdirname = TempFileManager.create_temp_dir(request_id)
|
|
|
|
|
|
zip_path = os.path.join(tmpdirname, "dataset.zip")
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
|
|
metadata_path = os.path.join(tmpdirname, "metadata.jsonl")
|
|
with open(metadata_path, 'w', encoding='utf-8') as f:
|
|
for record in matched_recordings:
|
|
f.write(json.dumps(record, ensure_ascii=False) + "\n")
|
|
zipf.write(metadata_path, "metadata.jsonl")
|
|
|
|
|
|
for record in matched_recordings:
|
|
audio_filename = record['audio_filename']
|
|
language = record['language']
|
|
full_audio_path = AudioDownloadService.get_audio_file_path(language, audio_filename)
|
|
|
|
if os.path.exists(full_audio_path):
|
|
zipf.write(full_audio_path, os.path.join(language, "audio", audio_filename))
|
|
|
|
|
|
if not os.path.exists(zip_path):
|
|
raise HTTPException(status_code=500, detail="Failed to create zip file")
|
|
|
|
|
|
return FileResponse(
|
|
path=zip_path,
|
|
media_type="application/zip",
|
|
filename="dataset.zip",
|
|
background=BackgroundTask(TempFileManager.cleanup_temp_dir, request_id)
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |