from document_to_gloss import DocumentToASLConverter from vectorizer import Vectorizer from video_gen import create_multi_stitched_video import gradio as gr import asyncio import re import boto3 import os from botocore.config import Config from dotenv import load_dotenv import requests import tempfile import uuid import base64 # Load environment variables from .env file load_dotenv() # Load R2/S3 environment secrets R2_ENDPOINT = os.environ.get("R2_ENDPOINT") R2_ACCESS_KEY_ID = os.environ.get("R2_ACCESS_KEY_ID") R2_SECRET_ACCESS_KEY = os.environ.get("R2_SECRET_ACCESS_KEY") # Validate that required environment variables are set if not all([R2_ENDPOINT, R2_ACCESS_KEY_ID, R2_SECRET_ACCESS_KEY]): raise ValueError("Missing required R2 environment variables. Please check your .env file.") title = "AI-SL" description = "Convert text to ASL!" article = ("
") inputs = gr.File(label="Upload Document (pdf, txt, docx, or epub)") outputs = [ gr.JSON(label="Processing Results"), gr.Video(label="ASL Video Output"), gr.HTML(label="Download Link") ] asl_converter = DocumentToASLConverter() vectorizer = Vectorizer() session = boto3.session.Session() s3 = session.client( service_name='s3', region_name='auto', endpoint_url=R2_ENDPOINT, aws_access_key_id=R2_ACCESS_KEY_ID, aws_secret_access_key=R2_SECRET_ACCESS_KEY, config=Config(signature_version='s3v4') ) def clean_gloss_token(token): """ Clean a gloss token by removing brackets, newlines, and extra whitespace """ # Remove brackets and newlines cleaned = re.sub(r'[\[\]\n\r]', '', token) # Remove extra whitespace cleaned = re.sub(r'\s+', ' ', cleaned).strip() cleaned = cleaned.lower() return cleaned def upload_video_to_r2(video_path, bucket_name="ai-sl-videos"): """ Upload a video file to R2 and return a public URL """ try: # Generate a unique filename file_extension = os.path.splitext(video_path)[1] unique_filename = f"{uuid.uuid4()}{file_extension}" # Upload to R2 with open(video_path, 'rb') as video_file: s3.upload_fileobj( video_file, bucket_name, unique_filename, ExtraArgs={'ACL': 'public-read'} ) # Generate the public URL video_url = f"{R2_ENDPOINT}/{bucket_name}/{unique_filename}" print(f"Video uploaded to R2: {video_url}") return video_url except Exception as e: print(f"Error uploading video to R2: {e}") return None def video_to_base64(video_path): """ Convert a video file to base64 string for direct download """ try: with open(video_path, 'rb') as video_file: video_data = video_file.read() base64_data = base64.b64encode(video_data).decode('utf-8') return f"data:video/mp4;base64,{base64_data}" except Exception as e: print(f"Error converting video to base64: {e}") return None def download_video_from_url(video_url): """ Download a video from a public R2 URL Returns the local file path where the video is saved """ try: # Create a temporary file with .mp4 extension temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') temp_path = temp_file.name temp_file.close() # Download the video print(f"Downloading video from: {video_url}") response = requests.get(video_url, stream=True) response.raise_for_status() # Save to temporary file with open(temp_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"Video downloaded to: {temp_path}") return temp_path except Exception as e: print(f"Error downloading video: {e}") return None def cleanup_temp_video(file_path): """ Clean up temporary video file """ try: if file_path and os.path.exists(file_path): os.unlink(file_path) print(f"Cleaned up: {file_path}") except Exception as e: print(f"Error cleaning up file: {e}") async def parse_vectorize_and_search(file): print(file) gloss = asl_converter.convert_document(file) print("ASL", gloss) # Split by spaces and clean each token gloss_tokens = gloss.split() cleaned_tokens = [] for token in gloss_tokens: cleaned = clean_gloss_token(token) if cleaned: # Only add non-empty tokens cleaned_tokens.append(cleaned) print("Cleaned tokens:", cleaned_tokens) videos = [] video_files = [] # Store local file paths for stitching for g in cleaned_tokens: print(f"Processing {g}") try: result = await vectorizer.vector_query_from_supabase(query=g) print("result", result) if result.get("match", False): video_url = result["video_url"] videos.append(video_url) # Download the video local_path = download_video_from_url(video_url) if local_path: video_files.append(local_path) except Exception as e: print(f"Error processing {g}: {e}") continue # Create stitched video if we have multiple videos stitched_video_path = None if len(video_files) > 1: try: print(f"Creating stitched video from {len(video_files)} videos...") stitched_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name create_multi_stitched_video(video_files, stitched_video_path) print(f"Stitched video created: {stitched_video_path}") except Exception as e: print(f"Error creating stitched video: {e}") stitched_video_path = None elif len(video_files) == 1: # If only one video, just use it directly stitched_video_path = video_files[0] # Upload final video to R2 and get public URL final_video_url = None if stitched_video_path: final_video_url = upload_video_to_r2(stitched_video_path) # Clean up the local file after upload cleanup_temp_video(stitched_video_path) # Clean up individual video files after stitching for video_file in video_files: if video_file != stitched_video_path: # Don't delete the final output cleanup_temp_video(video_file) # Create download link HTML download_html = "" if final_video_url: download_html = f"""Right-click and "Save As" if the download doesn't start automatically