Spaces:
Running
Running
from document_to_gloss import DocumentToASLConverter | |
from vectorizer import Vectorizer | |
from video_gen import create_multi_stitched_video | |
import gradio as gr | |
import asyncio | |
import re | |
import boto3 | |
import os | |
from botocore.config import Config | |
from dotenv import load_dotenv | |
import requests | |
import tempfile | |
import uuid | |
import base64 | |
# Load environment variables from .env file | |
load_dotenv() | |
# Load R2/S3 environment secrets | |
R2_ENDPOINT = os.environ.get("R2_ENDPOINT") | |
R2_ACCESS_KEY_ID = os.environ.get("R2_ACCESS_KEY_ID") | |
R2_SECRET_ACCESS_KEY = os.environ.get("R2_SECRET_ACCESS_KEY") | |
# Validate that required environment variables are set | |
if not all([R2_ENDPOINT, R2_ACCESS_KEY_ID, R2_SECRET_ACCESS_KEY]): | |
raise ValueError("Missing required R2 environment variables. Please check your .env file.") | |
title = "AI-SL" | |
description = "Convert text to ASL!" | |
article = ("<p style='text-align: center'><a href='https://github.com/deenasun' " | |
"target='_blank'>Deena Sun on Github</a></p>") | |
inputs = gr.File(label="Upload Document (pdf, txt, docx, or epub)") | |
outputs = [ | |
gr.JSON(label="Processing Results"), | |
gr.Video(label="ASL Video Output"), | |
gr.HTML(label="Download Link") | |
] | |
asl_converter = DocumentToASLConverter() | |
vectorizer = Vectorizer() | |
session = boto3.session.Session() | |
s3 = session.client( | |
service_name='s3', | |
region_name='auto', | |
endpoint_url=R2_ENDPOINT, | |
aws_access_key_id=R2_ACCESS_KEY_ID, | |
aws_secret_access_key=R2_SECRET_ACCESS_KEY, | |
config=Config(signature_version='s3v4') | |
) | |
def clean_gloss_token(token): | |
""" | |
Clean a gloss token by removing brackets, newlines, and extra whitespace | |
""" | |
# Remove brackets and newlines | |
cleaned = re.sub(r'[\[\]\n\r]', '', token) | |
# Remove extra whitespace | |
cleaned = re.sub(r'\s+', ' ', cleaned).strip() | |
cleaned = cleaned.lower() | |
return cleaned | |
def upload_video_to_r2(video_path, bucket_name="ai-sl-videos"): | |
""" | |
Upload a video file to R2 and return a public URL | |
""" | |
try: | |
# Generate a unique filename | |
file_extension = os.path.splitext(video_path)[1] | |
unique_filename = f"{uuid.uuid4()}{file_extension}" | |
# Upload to R2 | |
with open(video_path, 'rb') as video_file: | |
s3.upload_fileobj( | |
video_file, | |
bucket_name, | |
unique_filename, | |
ExtraArgs={'ACL': 'public-read'} | |
) | |
# Generate the public URL | |
video_url = f"{R2_ENDPOINT}/{bucket_name}/{unique_filename}" | |
print(f"Video uploaded to R2: {video_url}") | |
return video_url | |
except Exception as e: | |
print(f"Error uploading video to R2: {e}") | |
return None | |
def video_to_base64(video_path): | |
""" | |
Convert a video file to base64 string for direct download | |
""" | |
try: | |
with open(video_path, 'rb') as video_file: | |
video_data = video_file.read() | |
base64_data = base64.b64encode(video_data).decode('utf-8') | |
return f"data:video/mp4;base64,{base64_data}" | |
except Exception as e: | |
print(f"Error converting video to base64: {e}") | |
return None | |
def download_video_from_url(video_url): | |
""" | |
Download a video from a public R2 URL | |
Returns the local file path where the video is saved | |
""" | |
try: | |
# Create a temporary file with .mp4 extension | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
temp_path = temp_file.name | |
temp_file.close() | |
# Download the video | |
print(f"Downloading video from: {video_url}") | |
response = requests.get(video_url, stream=True) | |
response.raise_for_status() | |
# Save to temporary file | |
with open(temp_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"Video downloaded to: {temp_path}") | |
return temp_path | |
except Exception as e: | |
print(f"Error downloading video: {e}") | |
return None | |
def cleanup_temp_video(file_path): | |
""" | |
Clean up temporary video file | |
""" | |
try: | |
if file_path and os.path.exists(file_path): | |
os.unlink(file_path) | |
print(f"Cleaned up: {file_path}") | |
except Exception as e: | |
print(f"Error cleaning up file: {e}") | |
async def parse_vectorize_and_search(file): | |
print(file) | |
gloss = asl_converter.convert_document(file) | |
print("ASL", gloss) | |
# Split by spaces and clean each token | |
gloss_tokens = gloss.split() | |
cleaned_tokens = [] | |
for token in gloss_tokens: | |
cleaned = clean_gloss_token(token) | |
if cleaned: # Only add non-empty tokens | |
cleaned_tokens.append(cleaned) | |
print("Cleaned tokens:", cleaned_tokens) | |
videos = [] | |
video_files = [] # Store local file paths for stitching | |
for g in cleaned_tokens: | |
print(f"Processing {g}") | |
try: | |
result = await vectorizer.vector_query_from_supabase(query=g) | |
print("result", result) | |
if result.get("match", False): | |
video_url = result["video_url"] | |
videos.append(video_url) | |
# Download the video | |
local_path = download_video_from_url(video_url) | |
if local_path: | |
video_files.append(local_path) | |
except Exception as e: | |
print(f"Error processing {g}: {e}") | |
continue | |
# Create stitched video if we have multiple videos | |
stitched_video_path = None | |
if len(video_files) > 1: | |
try: | |
print(f"Creating stitched video from {len(video_files)} videos...") | |
stitched_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name | |
create_multi_stitched_video(video_files, stitched_video_path) | |
print(f"Stitched video created: {stitched_video_path}") | |
except Exception as e: | |
print(f"Error creating stitched video: {e}") | |
stitched_video_path = None | |
elif len(video_files) == 1: | |
# If only one video, just use it directly | |
stitched_video_path = video_files[0] | |
# Upload final video to R2 and get public URL | |
final_video_url = None | |
if stitched_video_path: | |
final_video_url = upload_video_to_r2(stitched_video_path) | |
# Clean up the local file after upload | |
cleanup_temp_video(stitched_video_path) | |
# Clean up individual video files after stitching | |
for video_file in video_files: | |
if video_file != stitched_video_path: # Don't delete the final output | |
cleanup_temp_video(video_file) | |
# Create download link HTML | |
download_html = "" | |
if final_video_url: | |
download_html = f""" | |
<div style="text-align: center; padding: 20px;"> | |
<h3>Download Your ASL Video</h3> | |
<a href="{final_video_url}" download="asl_video.mp4" | |
style="background-color: #4CAF50; color: white; | |
padding: 12px 24px; text-decoration: none; | |
border-radius: 4px; display: inline-block;"> | |
Download Video | |
</a> | |
<p style="margin-top: 10px; color: #666;"> | |
<small>Right-click and "Save As" if the download doesn't | |
start automatically</small> | |
</p> | |
</div> | |
""" | |
return { | |
"status": "success", | |
"videos": videos, | |
"video_count": len(videos), | |
"gloss": gloss, | |
"cleaned_tokens": cleaned_tokens, | |
"final_video_url": final_video_url | |
}, final_video_url, download_html | |
# Create a synchronous wrapper for Gradio | |
def parse_vectorize_and_search_sync(file): | |
return asyncio.run(parse_vectorize_and_search(file)) | |
async def parse_vectorize_and_search_base64(file): | |
""" | |
Alternative version that returns video as base64 data instead of uploading to R2 | |
""" | |
print(file) | |
gloss = asl_converter.convert_document(file) | |
print("ASL", gloss) | |
# Split by spaces and clean each token | |
gloss_tokens = gloss.split() | |
cleaned_tokens = [] | |
for token in gloss_tokens: | |
cleaned = clean_gloss_token(token) | |
if cleaned: # Only add non-empty tokens | |
cleaned_tokens.append(cleaned) | |
print("Cleaned tokens:", cleaned_tokens) | |
videos = [] | |
video_files = [] # Store local file paths for stitching | |
for g in cleaned_tokens: | |
print(f"Processing {g}") | |
try: | |
result = await vectorizer.vector_query_from_supabase(query=g) | |
print("result", result) | |
if result.get("match", False): | |
video_url = result["video_url"] | |
videos.append(video_url) | |
# Download the video | |
local_path = download_video_from_url(video_url) | |
if local_path: | |
video_files.append(local_path) | |
except Exception as e: | |
print(f"Error processing {g}: {e}") | |
continue | |
# Create stitched video if we have multiple videos | |
stitched_video_path = None | |
if len(video_files) > 1: | |
try: | |
print(f"Creating stitched video from {len(video_files)} videos...") | |
stitched_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name | |
create_multi_stitched_video(video_files, stitched_video_path) | |
print(f"Stitched video created: {stitched_video_path}") | |
except Exception as e: | |
print(f"Error creating stitched video: {e}") | |
stitched_video_path = None | |
elif len(video_files) == 1: | |
# If only one video, just use it directly | |
stitched_video_path = video_files[0] | |
# Convert final video to base64 | |
final_video_base64 = None | |
if stitched_video_path: | |
final_video_base64 = video_to_base64(stitched_video_path) | |
# Clean up the local file after conversion | |
cleanup_temp_video(stitched_video_path) | |
# Clean up individual video files after stitching | |
for video_file in video_files: | |
if video_file != stitched_video_path: # Don't delete the final output | |
cleanup_temp_video(video_file) | |
# Create download link HTML for base64 | |
download_html = "" | |
if final_video_base64: | |
download_html = f""" | |
<div style="text-align: center; padding: 20px;"> | |
<h3>Download Your ASL Video</h3> | |
<a href="{final_video_base64}" download="asl_video.mp4" | |
style="background-color: #4CAF50; color: white; | |
padding: 12px 24px; text-decoration: none; | |
border-radius: 4px; display: inline-block;"> | |
Download Video | |
</a> | |
<p style="margin-top: 10px; color: #666;"> | |
<small>Video is embedded directly - click to download</small> | |
</p> | |
</div> | |
""" | |
return { | |
"status": "success", | |
"videos": videos, | |
"video_count": len(videos), | |
"gloss": gloss, | |
"cleaned_tokens": cleaned_tokens, | |
"video_format": "base64" | |
}, final_video_base64, download_html | |
def parse_vectorize_and_search_base64_sync(file): | |
return asyncio.run(parse_vectorize_and_search_base64(file)) | |
intf = gr.Interface( | |
fn=parse_vectorize_and_search_sync, | |
inputs=inputs, | |
outputs=outputs, | |
title=title, | |
description=description, | |
article=article | |
) | |
intf.launch(share=True) | |