#!/usr/bin/env python3 """ API Client for Hugging Face Spaces Video Transcription Service Test both web interface and API functionality """ import requests import time import sys import os from datetime import datetime class HFTranscriptionClient: def __init__(self, space_url): """ Initialize client for HF Spaces transcription service Args: space_url: Your HF Space URL (e.g., "https://username-spacename.hf.space") """ self.base_url = space_url.rstrip('/') self.api_base = f"{self.base_url}/api" def health_check(self): """Check if the service is healthy""" try: response = requests.get(f"{self.api_base}/health", timeout=10) if response.status_code == 200: health = response.json() print("āœ… Service is healthy") print(f" Model loaded: {health.get('model_loaded', False)}") print(f" Active transcriptions: {health.get('active_transcriptions', 0)}") return True else: print(f"āŒ Health check failed: {response.status_code}") return False except requests.exceptions.RequestException as e: print(f"āŒ Cannot connect to service: {e}") return False def transcribe_video(self, video_path, language=None): """ Upload video for transcription Args: video_path: Path to video file language: Language code (e.g., 'en', 'es') or None for auto-detect Returns: dict: Response with transcription ID or error """ if not os.path.exists(video_path): return {"error": f"Video file not found: {video_path}"} try: print(f"šŸ“¤ Uploading video: {video_path}") with open(video_path, 'rb') as f: files = {'file': f} data = {} if language: data['language'] = language response = requests.post( f"{self.api_base}/transcribe", files=files, data=data, timeout=60 ) if response.status_code == 200: result = response.json() print(f"āœ… Upload successful! Transcription ID: {result['id']}") return result else: error_msg = f"Upload failed: {response.status_code}" if response.text: error_msg += f" - {response.text}" print(f"āŒ {error_msg}") return {"error": error_msg} except requests.exceptions.RequestException as e: error_msg = f"Upload error: {e}" print(f"āŒ {error_msg}") return {"error": error_msg} def get_transcription_status(self, transcription_id): """ Get transcription status and results Args: transcription_id: ID returned from transcribe_video Returns: dict: Transcription status and results """ try: response = requests.get( f"{self.api_base}/transcribe/{transcription_id}", timeout=10 ) if response.status_code == 200: return response.json() elif response.status_code == 404: return {"error": "Transcription not found or expired"} else: return {"error": f"Status check failed: {response.status_code}"} except requests.exceptions.RequestException as e: return {"error": f"Status check error: {e}"} def wait_for_completion(self, transcription_id, max_wait_minutes=15, poll_interval=10): """ Wait for transcription to complete Args: transcription_id: ID to monitor max_wait_minutes: Maximum time to wait poll_interval: Seconds between status checks Returns: dict: Final transcription result """ print(f"ā³ Waiting for transcription {transcription_id} to complete...") print(f" Max wait time: {max_wait_minutes} minutes") print(f" Checking every {poll_interval} seconds") start_time = time.time() max_wait_seconds = max_wait_minutes * 60 while time.time() - start_time < max_wait_seconds: result = self.get_transcription_status(transcription_id) if "error" in result: print(f"āŒ Error checking status: {result['error']}") return result status = result.get('status', 'unknown') print(f" Status: {status}") if status == 'completed': print("šŸŽ‰ Transcription completed!") return result elif status == 'failed': error_msg = result.get('error_message', 'Unknown error') print(f"āŒ Transcription failed: {error_msg}") return result elif status in ['pending', 'processing']: time.sleep(poll_interval) else: print(f"āŒ Unknown status: {status}") return result print(f"ā° Transcription timed out after {max_wait_minutes} minutes") return {"error": "Timeout waiting for completion"} def transcribe_and_wait(self, video_path, language=None, max_wait_minutes=15): """ Upload video and wait for transcription to complete Args: video_path: Path to video file language: Language code or None for auto-detect max_wait_minutes: Maximum time to wait Returns: dict: Complete transcription result """ # Upload video upload_result = self.transcribe_video(video_path, language) if "error" in upload_result: return upload_result transcription_id = upload_result['id'] # Wait for completion return self.wait_for_completion(transcription_id, max_wait_minutes) def main(): """Main function for testing the HF Spaces API""" if len(sys.argv) < 2: print("Hugging Face Spaces Video Transcription API Client") print("=" * 50) print("Usage:") print(" python hf_api_client.py [video_file] [language]") print() print("Examples:") print(" python hf_api_client.py https://username-spacename.hf.space") print(" python hf_api_client.py https://username-spacename.hf.space video.mp4") print(" python hf_api_client.py https://username-spacename.hf.space video.mp4 en") print() print("Commands:") print(" health - Check service health") print(" test - Run basic functionality test") sys.exit(1) space_url = sys.argv[1] client = HFTranscriptionClient(space_url) print(f"🌐 Connecting to: {space_url}") print("=" * 50) # Health check if not client.health_check(): print("āŒ Service is not available. Please check your Space URL and try again.") sys.exit(1) # If video file provided, transcribe it if len(sys.argv) >= 3: video_file = sys.argv[2] language = sys.argv[3] if len(sys.argv) > 3 else None print(f"\nšŸŽ¬ Transcribing video: {video_file}") if language: print(f"🌐 Language: {language}") else: print("🌐 Language: auto-detect") result = client.transcribe_and_wait(video_file, language) if "error" in result: print(f"āŒ Transcription failed: {result['error']}") else: print("\nšŸŽ‰ Transcription Results:") print("=" * 30) print(f"ID: {result.get('id', 'N/A')}") print(f"Language: {result.get('language', 'N/A')}") print(f"Duration: {result.get('duration', 'N/A')} seconds") print(f"Status: {result.get('status', 'N/A')}") print("\nTranscribed Text:") print("-" * 20) print(result.get('text', 'No text available')) # Save to file if result.get('text'): output_file = f"{os.path.splitext(video_file)[0]}_transcription.txt" with open(output_file, 'w', encoding='utf-8') as f: f.write(f"Transcription of: {video_file}\n") f.write(f"Language: {result.get('language', 'N/A')}\n") f.write(f"Duration: {result.get('duration', 'N/A')} seconds\n") f.write(f"Completed: {datetime.now().isoformat()}\n") f.write("\n" + "=" * 50 + "\n") f.write(result['text']) print(f"\nšŸ’¾ Transcription saved to: {output_file}") else: print("\nāœ… Service is ready!") print("🌐 Web interface:", space_url) print("šŸ”— API base URL:", client.api_base) print("\nšŸ“‹ To transcribe a video:") print(f" python {sys.argv[0]} {space_url} your_video.mp4") if __name__ == "__main__": main()