|
|
|
""" |
|
Hugging Face Spaces app.py - Video Transcription Service |
|
Combines Gradio interface with FastAPI for full functionality |
|
""" |
|
|
|
import gradio as gr |
|
import asyncio |
|
import threading |
|
import time |
|
import os |
|
import logging |
|
import socket |
|
from datetime import datetime |
|
from typing import Optional, Tuple |
|
import uvicorn |
|
from fastapi import FastAPI, File, UploadFile, HTTPException |
|
from fastapi.responses import JSONResponse |
|
import tempfile |
|
|
|
|
|
from config import settings |
|
from models import TranscriptionStatus, TranscriptionResponse, TranscriptionResult |
|
from storage import storage |
|
from transcription_service import transcription_service |
|
from logging_config import setup_logging, log_step, log_success, log_error |
|
|
|
def find_available_port(start_port=7860, max_attempts=100): |
|
"""Find an available port starting from start_port""" |
|
for port in range(start_port, start_port + max_attempts): |
|
try: |
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
|
s.bind(('0.0.0.0', port)) |
|
log_success(f"Found available port: {port}") |
|
return port |
|
except OSError: |
|
continue |
|
|
|
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
|
s.bind(('0.0.0.0', 0)) |
|
port = s.getsockname()[1] |
|
log_success(f"Using system-assigned port: {port}") |
|
return port |
|
|
|
|
|
setup_logging(level=logging.INFO, log_to_file=False) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
os.environ.setdefault("WHISPER_MODEL", "base") |
|
os.environ.setdefault("MODEL_PRELOAD", "true") |
|
os.environ.setdefault("DEBUG", "false") |
|
|
|
|
|
api_app = FastAPI( |
|
title="Video Transcription API", |
|
description="API endpoints for video transcription", |
|
version="1.0.0" |
|
) |
|
|
|
class TranscriptionManager: |
|
def __init__(self): |
|
self.model_loaded = False |
|
self.model_loading = False |
|
|
|
async def ensure_model_loaded(self): |
|
"""Ensure Whisper model is loaded""" |
|
if self.model_loaded: |
|
return True |
|
|
|
if self.model_loading: |
|
while self.model_loading: |
|
await asyncio.sleep(0.1) |
|
return self.model_loaded |
|
|
|
self.model_loading = True |
|
try: |
|
logger.info("π€ Loading Whisper model for Hugging Face Spaces...") |
|
success = await transcription_service.preload_model() |
|
self.model_loaded = success |
|
return success |
|
finally: |
|
self.model_loading = False |
|
|
|
|
|
transcription_manager = TranscriptionManager() |
|
|
|
|
|
@api_app.post("/transcribe") |
|
async def api_transcribe(file: UploadFile = File(...), language: str = None): |
|
"""API endpoint for video transcription""" |
|
try: |
|
|
|
if not await transcription_manager.ensure_model_loaded(): |
|
raise HTTPException(status_code=503, detail="Model not available") |
|
|
|
|
|
if not file.filename: |
|
raise HTTPException(status_code=400, detail="No file provided") |
|
|
|
|
|
content = await file.read() |
|
if len(content) > settings.MAX_FILE_SIZE: |
|
raise HTTPException(status_code=413, detail="File too large") |
|
|
|
|
|
transcription_id = storage.create_transcription(language=language) |
|
|
|
|
|
asyncio.create_task( |
|
transcription_service.transcribe_video(content, transcription_id, language) |
|
) |
|
|
|
return TranscriptionResponse( |
|
id=transcription_id, |
|
status=TranscriptionStatus.PENDING, |
|
message="Transcription started", |
|
created_at=storage.get_transcription(transcription_id).created_at |
|
) |
|
|
|
except HTTPException: |
|
raise |
|
except Exception as e: |
|
logger.error(f"API transcription error: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@api_app.get("/transcribe/{transcription_id}") |
|
async def api_get_transcription(transcription_id: int): |
|
"""API endpoint to get transcription status/results""" |
|
result = storage.get_transcription(transcription_id) |
|
if not result: |
|
raise HTTPException(status_code=404, detail="Transcription not found") |
|
return result |
|
|
|
@api_app.get("/health") |
|
async def api_health(): |
|
"""API health check""" |
|
return { |
|
"status": "healthy", |
|
"model_loaded": transcription_manager.model_loaded, |
|
"active_transcriptions": len([ |
|
t for t in storage._storage.values() |
|
if t.status in [TranscriptionStatus.PENDING, TranscriptionStatus.PROCESSING] |
|
]) if hasattr(storage, '_storage') else 0 |
|
} |
|
|
|
|
|
def gradio_transcribe(video_file, language): |
|
"""Gradio transcription function""" |
|
if video_file is None: |
|
return "β Please upload a video file", "", "" |
|
|
|
try: |
|
|
|
if not transcription_manager.model_loaded: |
|
return "β Model not loaded yet. Please wait and try again.", "", "" |
|
|
|
|
|
with open(video_file, 'rb') as f: |
|
content = f.read() |
|
|
|
if len(content) > settings.MAX_FILE_SIZE: |
|
return f"β File too large. Maximum size: {settings.MAX_FILE_SIZE // (1024*1024)}MB", "", "" |
|
|
|
|
|
transcription_id = storage.create_transcription(language=language if language != "auto" else None) |
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
loop.run_in_executor( |
|
None, |
|
lambda: asyncio.run(transcription_service.transcribe_video( |
|
content, transcription_id, language if language != "auto" else None |
|
)) |
|
) |
|
|
|
return f"β
Transcription started with ID: {transcription_id}", str(transcription_id), "β³ Processing..." |
|
|
|
except Exception as e: |
|
logger.error(f"Gradio transcription error: {e}") |
|
return f"β Error: {str(e)}", "", "" |
|
|
|
def gradio_check_status(transcription_id_str): |
|
"""Check transcription status for Gradio""" |
|
if not transcription_id_str: |
|
return "β Please provide a transcription ID" |
|
|
|
try: |
|
transcription_id = int(transcription_id_str) |
|
result = storage.get_transcription(transcription_id) |
|
|
|
if not result: |
|
return "β Transcription not found or expired" |
|
|
|
if result.status == TranscriptionStatus.COMPLETED: |
|
return f"β
Completed!\n\nLanguage: {result.language}\nDuration: {result.duration}s\n\nText:\n{result.text}" |
|
elif result.status == TranscriptionStatus.FAILED: |
|
return f"β Failed: {result.error_message}" |
|
elif result.status == TranscriptionStatus.PROCESSING: |
|
return "β³ Still processing... Please wait and check again." |
|
else: |
|
return "β³ Pending... Please wait and check again." |
|
|
|
except ValueError: |
|
return "β Invalid transcription ID (must be a number)" |
|
except Exception as e: |
|
return f"β Error: {str(e)}" |
|
|
|
|
|
def create_gradio_interface(): |
|
"""Create the Gradio interface""" |
|
|
|
with gr.Blocks( |
|
title="Video Transcription Service", |
|
theme=gr.themes.Soft(), |
|
css=""" |
|
.gradio-container { |
|
max-width: 1000px !important; |
|
} |
|
""" |
|
) as interface: |
|
|
|
gr.Markdown(""" |
|
# π¬ Video Transcription Service |
|
|
|
Upload your video files and get accurate transcriptions using OpenAI Whisper. |
|
|
|
**Features:** |
|
- π₯ Multiple video formats (MP4, AVI, MOV, etc.) |
|
- π Automatic language detection or manual selection |
|
- π Fast processing with OpenAI Whisper |
|
- π± Both web interface and API access |
|
""") |
|
|
|
with gr.Tab("π€ Upload & Transcribe"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
video_input = gr.File( |
|
label="Upload Video File", |
|
file_types=["video"], |
|
type="filepath" |
|
) |
|
language_input = gr.Dropdown( |
|
choices=["auto", "en", "es", "fr", "de", "it", "pt", "ru", "ja", "ko", "zh", "ar", "hi"], |
|
value="auto", |
|
label="Language (auto-detect or specify)" |
|
) |
|
transcribe_btn = gr.Button("π Start Transcription", variant="primary") |
|
|
|
with gr.Column(): |
|
status_output = gr.Textbox(label="Status", lines=3) |
|
transcription_id_output = gr.Textbox(label="Transcription ID", visible=True) |
|
result_output = gr.Textbox(label="Progress", lines=2) |
|
|
|
with gr.Tab("π Check Status"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
id_input = gr.Textbox(label="Transcription ID", placeholder="Enter transcription ID...") |
|
check_btn = gr.Button("π Check Status", variant="secondary") |
|
|
|
with gr.Column(): |
|
status_result = gr.Textbox(label="Result", lines=10) |
|
|
|
with gr.Tab("π§ API Documentation"): |
|
gr.Markdown(""" |
|
## π API Endpoints |
|
|
|
You can also use this service programmatically via API calls: |
|
|
|
### Upload Video for Transcription |
|
```bash |
|
curl -X POST "https://your-space-name.hf.space/api/transcribe" \\ |
|
-F "[email protected]" \\ |
|
-F "language=en" |
|
``` |
|
|
|
### Check Transcription Status |
|
```bash |
|
curl "https://your-space-name.hf.space/api/transcribe/123" |
|
``` |
|
|
|
### Health Check |
|
```bash |
|
curl "https://your-space-name.hf.space/api/health" |
|
``` |
|
|
|
### Python Example |
|
```python |
|
import requests |
|
|
|
# Upload video |
|
with open('video.mp4', 'rb') as f: |
|
response = requests.post( |
|
'https://your-space-name.hf.space/api/transcribe', |
|
files={'file': f}, |
|
data={'language': 'en'} |
|
) |
|
transcription_id = response.json()['id'] |
|
|
|
# Check status |
|
result = requests.get(f'https://your-space-name.hf.space/api/transcribe/{transcription_id}') |
|
print(result.json()) |
|
``` |
|
""") |
|
|
|
|
|
transcribe_btn.click( |
|
fn=gradio_transcribe, |
|
inputs=[video_input, language_input], |
|
outputs=[status_output, transcription_id_output, result_output] |
|
) |
|
|
|
check_btn.click( |
|
fn=gradio_check_status, |
|
inputs=[id_input], |
|
outputs=[status_result] |
|
) |
|
|
|
return interface |
|
|
|
|
|
async def startup(): |
|
"""Initialize services""" |
|
logger.info("π Starting Video Transcription Service on Hugging Face Spaces") |
|
|
|
|
|
await storage.start_cleanup_task() |
|
|
|
|
|
log_step("Preloading Whisper model") |
|
success = await transcription_manager.ensure_model_loaded() |
|
if success: |
|
log_success("Model preloaded successfully") |
|
else: |
|
log_error("Model preload failed") |
|
|
|
def run_fastapi(port=None): |
|
"""Run FastAPI in a separate thread""" |
|
if port is None: |
|
port = find_available_port(7860) |
|
log_step(f"Starting FastAPI server on port {port}") |
|
uvicorn.run(api_app, host="0.0.0.0", port=port, log_level="info") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
asyncio.run(startup()) |
|
|
|
|
|
|
|
interface = create_gradio_interface() |
|
|
|
|
|
app_with_gradio = gr.mount_gradio_app(api_app, interface, path="/") |
|
|
|
|
|
uvicorn.run(app_with_gradio, host="0.0.0.0", port=7860) |
|
|