dhravani / audio_download_api.py
coild's picture
Upload 42 files
31f1189 verified
import os
import json
import tempfile
import zipfile
import asyncio
from typing import Optional, List, Dict, Any, Union
import pyarrow.parquet as pq
import fastapi
from fastapi import HTTPException, Request
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.background import BackgroundTask
from language_config import get_language_name, LANGUAGES
class TempFileManager:
_temp_dirs: Dict[str, tempfile.TemporaryDirectory] = {}
@classmethod
def create_temp_dir(cls, key: str) -> str:
temp_dir = tempfile.TemporaryDirectory()
cls._temp_dirs[key] = temp_dir
return temp_dir.name
@classmethod
async def cleanup_temp_dir(cls, key: str) -> None:
if key in cls._temp_dirs:
temp_dir = cls._temp_dirs.pop(key)
await asyncio.to_thread(temp_dir.cleanup)
class AudioDownloadService:
BASE_DATASET_DIR = "datasets"
@classmethod
def get_metadata_file_path(cls, language: str) -> str:
"""Get the path to the metadata Parquet file for a given language."""
return os.path.join(cls.BASE_DATASET_DIR, language, f"{language}.parquet")
@classmethod
def get_audio_file_path(cls, language: str, audio_filename: str) -> str:
"""Get the full path to an audio file."""
return os.path.join(cls.BASE_DATASET_DIR, language, "audio", audio_filename)
@classmethod
def read_parquet_metadata(cls, language: str,
filters: Optional[List[tuple]] = None) -> List[Dict[str, Any]]:
"""
Read metadata from Parquet file with optional filtering.
:param language: Language code
:param filters: Optional list of filter tuples for Parquet filtering
:return: List of metadata dictionaries
"""
file_path = cls.get_metadata_file_path(language)
# Validate language
if language not in LANGUAGES:
raise HTTPException(status_code=400, detail=f"Invalid language code: {language}")
# Check file exists
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail=f"Metadata file not found for language: {language}")
# Read Parquet file
table = pq.read_table(file_path, filters=filters)
return table.to_pylist()
# Create templates and static directories if they don't exist
os.makedirs("templates", exist_ok=True)
os.makedirs("static", exist_ok=True)
app = fastapi.FastAPI(
title="Audio Dataset Download API",
description="API for downloading audio recordings and metadata"
)
# Mount static files directory
app.mount("/static", StaticFiles(directory="static"), name="static")
# Setup Jinja2 templates
templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse)
async def get_download_interface(request: Request):
"""Serve the download interface page."""
return templates.TemplateResponse(
"download.html",
{"request": request, "languages": LANGUAGES}
)
@app.get("/stats")
async def get_stats(
# User and Speaker Identification
user_id: Optional[str] = None,
username: Optional[str] = None,
speaker_id: Optional[str] = None,
speaker_name: Optional[str] = None,
# Audio File Metadata
audio_filename: Optional[str] = None,
audio_path: Optional[str] = None,
sampling_rate: Optional[str] = None,
duration_min: Optional[str] = None,
duration_max: Optional[str] = None,
# Language and Demographic Filters
language: Optional[str] = None,
gender: Optional[str] = None,
country: Optional[str] = None,
state: Optional[str] = None,
city: Optional[str] = None,
age_group: Optional[str] = None,
accent: Optional[str] = None,
# Verification and Timestamp
verified: Optional[bool] = None,
timestamp_min: Optional[str] = None,
timestamp_max: Optional[str] = None,
# Misc
max_recordings: Optional[int] = 100
):
"""Get statistics about the matching recordings without downloading."""
# Reuse filter logic from bulk_download
filters = []
# Add filters for each parameter
if user_id:
filters.append(('user_id', '=', user_id))
if username:
filters.append(('username', '=', username))
if speaker_id:
filters.append(('speaker_id', '=', speaker_id))
if speaker_name:
filters.append(('speaker_name', '=', speaker_name))
if audio_filename:
filters.append(('audio_filename', '=', audio_filename))
if audio_path:
filters.append(('audio_path', '=', audio_path))
if sampling_rate:
filters.append(('sampling_rate', '=', sampling_rate))
# Convert duration strings to float if valid
duration_min_val = float(duration_min) if duration_min and duration_min.strip() else None
duration_max_val = float(duration_max) if duration_max and duration_max.strip() else None
# Append filters
if duration_min_val is not None:
filters.append(('duration', '>=', duration_min_val))
if duration_max_val is not None:
filters.append(('duration', '<=', duration_max_val))
# Language and Demographic Filters
if language and language not in LANGUAGES:
raise HTTPException(status_code=400, detail=f"Invalid language code: {language}")
if language:
filters.append(('language', '=', language))
if gender:
filters.append(('gender', '=', gender))
if country:
filters.append(('country', '=', country))
if state:
filters.append(('state', '=', state))
if city:
filters.append(('city', '=', city))
if age_group:
filters.append(('age_group', '=', age_group))
if accent:
filters.append(('accent', '=', accent))
# Verification and Timestamp Filters
if verified is not None:
filters.append(('verified', '=', verified))
# Timestamp range filter (if needed - depends on timestamp format)
if timestamp_min:
filters.append(('timestamp', '>=', timestamp_min))
if timestamp_max:
filters.append(('timestamp', '<=', timestamp_max))
# Determine languages to search
languages_to_search = [language] if language else list(LANGUAGES.keys())
# Collect stats
total_recordings = 0
total_duration = 0
languages_found = set()
genders = {}
age_groups = {}
accents = {}
for lang in languages_to_search:
try:
# Extend filters with language if not already specified
lang_filters = filters.copy()
if not any(f[0] == 'language' for f in lang_filters):
lang_filters.append(('language', '=', lang))
# Read metadata
recordings = AudioDownloadService.read_parquet_metadata(lang, lang_filters)
if recordings:
languages_found.add(lang)
total_recordings += len(recordings)
for rec in recordings:
total_duration += rec.get('duration', 0)
genders[rec.get('gender', 'Unknown')] = genders.get(rec.get('gender', 'Unknown'), 0) + 1
age_groups[rec.get('age_group', 'Unknown')] = age_groups.get(rec.get('age_group', 'Unknown'), 0) + 1
accents[rec.get('accent', 'Unknown')] = accents.get(rec.get('accent', 'Unknown'), 0) + 1
except HTTPException:
continue
return {
"total_recordings": total_recordings,
"total_duration_hours": round(total_duration / 3600, 2),
"languages": list(languages_found),
"language_count": len(languages_found),
"gender_distribution": genders,
"age_distribution": age_groups,
"accent_distribution": accents
}
@app.get("/download")
async def bulk_download(
# User and Speaker Identification
user_id: Optional[str] = None,
username: Optional[str] = None,
speaker_id: Optional[str] = None,
speaker_name: Optional[str] = None,
# Audio File Metadata
audio_filename: Optional[str] = None,
audio_path: Optional[str] = None,
sampling_rate: Optional[str] = None,
duration_min: Optional[str] = None,
duration_max: Optional[str] = None,
# Language and Demographic Filters
language: Optional[str] = None,
gender: Optional[str] = None,
country: Optional[str] = None,
state: Optional[str] = None,
city: Optional[str] = None,
age_group: Optional[str] = None,
accent: Optional[str] = None,
# Verification and Timestamp
verified: Optional[bool] = None,
timestamp_min: Optional[str] = None,
timestamp_max: Optional[str] = None,
# Misc
max_recordings: Optional[int] = 100
):
"""
Bulk download audio files and metadata based on comprehensive filtering criteria.
"""
# Prepare filters
filters = []
# Add filters for each parameter
if user_id:
filters.append(('user_id', '=', user_id))
if username:
filters.append(('username', '=', username))
if speaker_id:
filters.append(('speaker_id', '=', speaker_id))
if speaker_name:
filters.append(('speaker_name', '=', speaker_name))
if audio_filename:
filters.append(('audio_filename', '=', audio_filename))
if audio_path:
filters.append(('audio_path', '=', audio_path))
if sampling_rate:
filters.append(('sampling_rate', '=', sampling_rate))
# Convert duration strings to float if valid
duration_min_val = float(duration_min) if duration_min and duration_min.strip() else None
duration_max_val = float(duration_max) if duration_max and duration_max.strip() else None
# Append filters
if duration_min_val is not None:
filters.append(('duration', '>=', duration_min_val))
if duration_max_val is not None:
filters.append(('duration', '<=', duration_max_val))
# Language and Demographic Filters
if language and language not in LANGUAGES:
raise HTTPException(status_code=400, detail=f"Invalid language code: {language}")
if language:
filters.append(('language', '=', language))
if gender:
filters.append(('gender', '=', gender))
if country:
filters.append(('country', '=', country))
if state:
filters.append(('state', '=', state))
if city:
filters.append(('city', '=', city))
if age_group:
filters.append(('age_group', '=', age_group))
if accent:
filters.append(('accent', '=', accent))
# Verification and Timestamp Filters
if verified is not None:
filters.append(('verified', '=', verified))
# Timestamp range filter (if needed - depends on timestamp format)
if timestamp_min:
filters.append(('timestamp', '>=', timestamp_min))
if timestamp_max:
filters.append(('timestamp', '<=', timestamp_max))
# Determine languages to search
languages_to_search = [language] if language else list(LANGUAGES.keys())
# Collect metadata and audio files
matched_recordings = []
for lang in languages_to_search:
try:
# Extend filters with language if not already specified
lang_filters = filters.copy()
if not any(f[0] == 'language' for f in lang_filters):
lang_filters.append(('language', '=', lang))
# Read metadata
recordings = AudioDownloadService.read_parquet_metadata(lang, lang_filters)
# Limit recordings
recordings = recordings[:max_recordings]
matched_recordings.extend(recordings)
# Break if max recordings reached
if len(matched_recordings) >= max_recordings:
matched_recordings = matched_recordings[:max_recordings]
break
except HTTPException:
# Skip languages without metadata
continue
# Check if any recordings found
if not matched_recordings:
raise HTTPException(status_code=404, detail="No recordings match the given criteria")
# Generate unique key for this request
request_id = os.urandom(16).hex()
# Create temporary directory
tmpdirname = TempFileManager.create_temp_dir(request_id)
# Prepare zip file
zip_path = os.path.join(tmpdirname, "dataset.zip")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Create metadata file in zip
metadata_path = os.path.join(tmpdirname, "metadata.jsonl")
with open(metadata_path, 'w', encoding='utf-8') as f:
for record in matched_recordings:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
zipf.write(metadata_path, "metadata.jsonl")
# Add audio files to zip
for record in matched_recordings:
audio_filename = record['audio_filename']
language = record['language']
full_audio_path = AudioDownloadService.get_audio_file_path(language, audio_filename)
if os.path.exists(full_audio_path):
zipf.write(full_audio_path, os.path.join(language, "audio", audio_filename))
# Verify zip file exists
if not os.path.exists(zip_path):
raise HTTPException(status_code=500, detail="Failed to create zip file")
# Return zip file with async cleanup
return FileResponse(
path=zip_path,
media_type="application/zip",
filename="dataset.zip",
background=BackgroundTask(TempFileManager.cleanup_temp_dir, request_id)
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)