Spaces:
Running
Running
from document_to_gloss import DocumentToASLConverter | |
from vectorizer import Vectorizer | |
from video_gen import create_multi_stitched_video | |
import gradio as gr | |
import asyncio | |
import re | |
import boto3 | |
import os | |
from botocore.config import Config | |
from dotenv import load_dotenv | |
import requests | |
import tempfile | |
import uuid | |
import base64 | |
# Load environment variables from .env file | |
load_dotenv() | |
# Load R2/S3 environment secrets | |
R2_ASL_VIDEOS_URL = os.environ.get("R2_ASL_VIDEOS_URL") | |
R2_ENDPOINT = os.environ.get("R2_ENDPOINT") | |
R2_ACCESS_KEY_ID = os.environ.get("R2_ACCESS_KEY_ID") | |
R2_SECRET_ACCESS_KEY = os.environ.get("R2_SECRET_ACCESS_KEY") | |
# Validate that required environment variables are set | |
if not all([R2_ASL_VIDEOS_URL, R2_ENDPOINT, R2_ACCESS_KEY_ID, R2_SECRET_ACCESS_KEY]): | |
raise ValueError( | |
"Missing required R2 environment variables. " | |
"Please check your .env file." | |
) | |
title = "AI-SL" | |
description = "Convert text to ASL!" | |
article = ("<p style='text-align: center'><a href='https://github.com/deenasun' " | |
"target='_blank'>Deena Sun on Github</a></p>") | |
inputs = gr.File(label="Upload Document (pdf, txt, docx, or epub)") | |
outputs = [ | |
gr.JSON(label="Processing Results"), | |
gr.Video(label="ASL Video Output"), | |
gr.HTML(label="Download Link") | |
] | |
asl_converter = DocumentToASLConverter() | |
vectorizer = Vectorizer() | |
session = boto3.session.Session() | |
s3 = session.client( | |
service_name='s3', | |
region_name='auto', | |
endpoint_url=R2_ENDPOINT, | |
aws_access_key_id=R2_ACCESS_KEY_ID, | |
aws_secret_access_key=R2_SECRET_ACCESS_KEY, | |
config=Config(signature_version='s3v4') | |
) | |
def clean_gloss_token(token): | |
""" | |
Clean a gloss token by removing brackets, newlines, and extra whitespace | |
""" | |
# Remove brackets and newlines | |
cleaned = re.sub(r'[\[\]\n\r]', '', token) | |
# Remove extra whitespace | |
cleaned = re.sub(r'\s+', ' ', cleaned).strip() | |
cleaned = cleaned.lower() | |
return cleaned | |
def upload_video_to_r2(video_path, bucket_name="asl-videos"): | |
""" | |
Upload a video file to R2 and return a public URL | |
""" | |
try: | |
# Generate a unique filename | |
file_extension = os.path.splitext(video_path)[1] | |
unique_filename = f"{uuid.uuid4()}{file_extension}" | |
# Upload to R2 | |
with open(video_path, 'rb') as video_file: | |
s3.upload_fileobj( | |
video_file, | |
bucket_name, | |
unique_filename, | |
ExtraArgs={'ACL': 'public-read'} | |
) | |
# Replace the endpoint with the domain for uploading | |
public_domain = R2_ENDPOINT.replace('https://', '').split('.')[0] | |
video_url = f"https://{public_domain}.r2.cloudflarestorage.com/{bucket_name}/{unique_filename}" | |
print(f"Video uploaded to R2: {video_url}") | |
public_video_url = f"{R2_ASL_VIDEOS_URL}/{unique_filename}" | |
return public_video_url | |
except Exception as e: | |
print(f"Error uploading video to R2: {e}") | |
return None | |
def video_to_base64(video_path): | |
""" | |
Convert a video file to base64 string for direct download | |
""" | |
try: | |
with open(video_path, 'rb') as video_file: | |
video_data = video_file.read() | |
base64_data = base64.b64encode(video_data).decode('utf-8') | |
return f"data:video/mp4;base64,{base64_data}" | |
except Exception as e: | |
print(f"Error converting video to base64: {e}") | |
return None | |
def download_video_from_url(video_url): | |
""" | |
Download a video from a public R2 URL | |
Returns the local file path where the video is saved | |
""" | |
try: | |
# Create a temporary file with .mp4 extension | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
temp_path = temp_file.name | |
temp_file.close() | |
# Download the video | |
print(f"Downloading video from: {video_url}") | |
response = requests.get(video_url, stream=True) | |
response.raise_for_status() | |
# Save to temporary file | |
with open(temp_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"Video downloaded to: {temp_path}") | |
return temp_path | |
except Exception as e: | |
print(f"Error downloading video: {e}") | |
return None | |
def cleanup_temp_video(file_path): | |
""" | |
Clean up temporary video file | |
""" | |
try: | |
if file_path and os.path.exists(file_path): | |
os.unlink(file_path) | |
print(f"Cleaned up: {file_path}") | |
except Exception as e: | |
print(f"Error cleaning up file: {e}") | |
def process_text_to_gloss(text): | |
""" | |
Convert text directly to ASL gloss | |
""" | |
try: | |
# For text input, we can use a simpler approach or call the | |
# document converter with a temporary text file | |
import tempfile | |
# Create a temporary text file | |
with tempfile.NamedTemporaryFile( | |
mode='w', suffix='.txt', delete=False | |
) as temp_file: | |
temp_file.write(text) | |
temp_file_path = temp_file.name | |
# Use the existing document converter | |
gloss = asl_converter.convert_document(temp_file_path) | |
# Clean up the temporary file | |
os.unlink(temp_file_path) | |
return gloss | |
except Exception as e: | |
print(f"Error processing text: {e}") | |
return None | |
def process_input(input_data): | |
""" | |
Process either text input or file upload | |
input_data can be either a string (text) or a file object | |
""" | |
if input_data is None: | |
return None | |
# Check if it's a file object (has .name attribute) | |
if hasattr(input_data, 'name'): | |
# It's a file upload | |
print(f"Processing file: {input_data.name}") | |
return asl_converter.convert_document(input_data.name) | |
else: | |
# It's text input | |
print(f"Processing text input: " | |
f"{input_data[:100]}...") | |
return process_text_to_gloss(input_data) | |
async def parse_vectorize_and_search_unified(input_data): | |
""" | |
Unified function that handles both text and file inputs | |
""" | |
print(f"Input type: {type(input_data)}") | |
# Process the input to get gloss | |
gloss = process_input(input_data) | |
if not gloss: | |
return { | |
"status": "error", | |
"message": "Failed to process input" | |
}, None, "" | |
print("ASL", gloss) | |
# Split by spaces and clean each token | |
gloss_tokens = gloss.split() | |
cleaned_tokens = [] | |
for token in gloss_tokens: | |
cleaned = clean_gloss_token(token) | |
if cleaned: # Only add non-empty tokens | |
cleaned_tokens.append(cleaned) | |
print("Cleaned tokens:", cleaned_tokens) | |
videos = [] | |
video_files = [] # Store local file paths for stitching | |
for g in cleaned_tokens: | |
print(f"Processing {g}") | |
try: | |
result = await vectorizer.vector_query_from_supabase(query=g) | |
print("result", result) | |
if result.get("match", False): | |
video_url = result["video_url"] | |
videos.append(video_url) | |
# Download the video | |
local_path = download_video_from_url(video_url) | |
if local_path: | |
video_files.append(local_path) | |
except Exception as e: | |
print(f"Error processing {g}: {e}") | |
continue | |
# Create stitched video if we have multiple videos | |
stitched_video_path = None | |
if len(video_files) > 1: | |
try: | |
print(f"Creating stitched video from {len(video_files)} videos...") | |
stitched_video_path = tempfile.NamedTemporaryFile( | |
delete=False, suffix='.mp4' | |
).name | |
create_multi_stitched_video(video_files, stitched_video_path) | |
print(f"Stitched video created: {stitched_video_path}") | |
except Exception as e: | |
print(f"Error creating stitched video: {e}") | |
stitched_video_path = None | |
elif len(video_files) == 1: | |
# If only one video, just use it directly | |
stitched_video_path = video_files[0] | |
# Upload final video to R2 and get public URL | |
final_video_url = None | |
if stitched_video_path: | |
final_video_url = upload_video_to_r2(stitched_video_path) | |
# Clean up the local file after upload | |
cleanup_temp_video(stitched_video_path) | |
# Clean up individual video files after stitching | |
for video_file in video_files: | |
if video_file != stitched_video_path: # Don't delete the final output | |
cleanup_temp_video(video_file) | |
# Create download link HTML | |
download_html = "" | |
if final_video_url: | |
download_html = f""" | |
<div style="text-align: center; padding: 20px;"> | |
<h3>Download Your ASL Video</h3> | |
<a href="{final_video_url}" download="asl_video.mp4" | |
style="background-color: #4CAF50; color: white; | |
padding: 12px 24px; text-decoration: none; | |
border-radius: 4px; display: inline-block;"> | |
Download Video | |
</a> | |
<p style="margin-top: 10px; color: #666;"> | |
<small>Right-click and "Save As" if the download doesn't | |
start automatically</small> | |
</p> | |
</div> | |
""" | |
return { | |
"status": "success", | |
"videos": videos, | |
"video_count": len(videos), | |
"gloss": gloss, | |
"cleaned_tokens": cleaned_tokens, | |
"final_video_url": final_video_url | |
}, final_video_url, download_html | |
def parse_vectorize_and_search_unified_sync(input_data): | |
return asyncio.run(parse_vectorize_and_search_unified(input_data)) | |
def predict_unified(input_data): | |
""" | |
Unified prediction function that handles both text and file inputs | |
""" | |
try: | |
if input_data is None: | |
return { | |
"status": "error", | |
"message": "Please provide text or upload a document" | |
}, None, "" | |
# Use the unified processing function | |
result = parse_vectorize_and_search_unified_sync(input_data) | |
return result | |
except Exception as e: | |
print(f"Error in predict_unified function: {e}") | |
return { | |
"status": "error", | |
"message": f"An error occurred: {str(e)}" | |
}, None, "" | |
# Create the Gradio interface | |
def create_interface(): | |
"""Create and configure the Gradio interface""" | |
with gr.Blocks(title=title) as demo: | |
gr.Markdown(f"# {title}") | |
gr.Markdown(description) | |
with gr.Row(): | |
with gr.Column(): | |
# Input section | |
gr.Markdown("## Input Options") | |
# Text input | |
gr.Markdown("### Option 1: Enter Text") | |
text_input = gr.Textbox( | |
label="Enter text to convert to ASL", | |
placeholder="Type or paste your text here...", | |
lines=5, | |
max_lines=10 | |
) | |
gr.Markdown("### Option 2: Upload Document") | |
file_input = gr.File( | |
label="Upload Document (pdf, txt, docx, or epub)", | |
file_types=[".pdf", ".txt", ".docx", ".epub"] | |
) | |
# Processing options | |
gr.Markdown("## Processing Options") | |
use_r2 = gr.Checkbox( | |
label="Use Cloud Storage (R2)", | |
value=True, | |
info=("Upload video to cloud storage for " | |
"persistent access") | |
) | |
process_btn = gr.Button( | |
"Generate ASL Video", | |
variant="primary" | |
) | |
with gr.Column(): | |
# Output section | |
gr.Markdown("## Results") | |
json_output = gr.JSON(label="Processing Results") | |
video_output = gr.Video(label="ASL Video Output") | |
download_html = gr.HTML(label="Download Link") | |
# Handle the processing | |
def process_inputs(text, file, use_r2_storage): | |
# Determine which input to use | |
if text and text.strip(): | |
# Use text input | |
input_data = text.strip() | |
elif file is not None: | |
# Use file input | |
input_data = file | |
else: | |
# No input provided | |
return { | |
"status": "error", | |
"message": "Please provide either text or upload a file" | |
}, None, "" | |
# Process using the unified function | |
return predict_unified(input_data) | |
process_btn.click( | |
fn=process_inputs, | |
inputs=[text_input, file_input, use_r2], | |
outputs=[json_output, video_output, download_html] | |
) | |
# Footer | |
gr.Markdown(article) | |
return demo | |
# For Hugging Face Spaces, use the Blocks interface | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True # Set to True for local testing with public URL | |
) | |