ai-sl-api / app.py
deenasun's picture
add two input options and R2 cloud upload-download
03ba989
raw
history blame
13.7 kB
from document_to_gloss import DocumentToASLConverter
from vectorizer import Vectorizer
from video_gen import create_multi_stitched_video
import gradio as gr
import asyncio
import re
import boto3
import os
from botocore.config import Config
from dotenv import load_dotenv
import requests
import tempfile
import uuid
import base64
# Load environment variables from .env file
load_dotenv()
# Load R2/S3 environment secrets
R2_ASL_VIDEOS_URL = os.environ.get("R2_ASL_VIDEOS_URL")
R2_ENDPOINT = os.environ.get("R2_ENDPOINT")
R2_ACCESS_KEY_ID = os.environ.get("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.environ.get("R2_SECRET_ACCESS_KEY")
# Validate that required environment variables are set
if not all([R2_ASL_VIDEOS_URL, R2_ENDPOINT, R2_ACCESS_KEY_ID, R2_SECRET_ACCESS_KEY]):
raise ValueError(
"Missing required R2 environment variables. "
"Please check your .env file."
)
title = "AI-SL"
description = "Convert text to ASL!"
article = ("<p style='text-align: center'><a href='https://github.com/deenasun' "
"target='_blank'>Deena Sun on Github</a></p>")
inputs = gr.File(label="Upload Document (pdf, txt, docx, or epub)")
outputs = [
gr.JSON(label="Processing Results"),
gr.Video(label="ASL Video Output"),
gr.HTML(label="Download Link")
]
asl_converter = DocumentToASLConverter()
vectorizer = Vectorizer()
session = boto3.session.Session()
s3 = session.client(
service_name='s3',
region_name='auto',
endpoint_url=R2_ENDPOINT,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
config=Config(signature_version='s3v4')
)
def clean_gloss_token(token):
"""
Clean a gloss token by removing brackets, newlines, and extra whitespace
"""
# Remove brackets and newlines
cleaned = re.sub(r'[\[\]\n\r]', '', token)
# Remove extra whitespace
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
cleaned = cleaned.lower()
return cleaned
def upload_video_to_r2(video_path, bucket_name="asl-videos"):
"""
Upload a video file to R2 and return a public URL
"""
try:
# Generate a unique filename
file_extension = os.path.splitext(video_path)[1]
unique_filename = f"{uuid.uuid4()}{file_extension}"
# Upload to R2
with open(video_path, 'rb') as video_file:
s3.upload_fileobj(
video_file,
bucket_name,
unique_filename,
ExtraArgs={'ACL': 'public-read'}
)
# Replace the endpoint with the domain for uploading
public_domain = R2_ENDPOINT.replace('https://', '').split('.')[0]
video_url = f"https://{public_domain}.r2.cloudflarestorage.com/{bucket_name}/{unique_filename}"
print(f"Video uploaded to R2: {video_url}")
public_video_url = f"{R2_ASL_VIDEOS_URL}/{unique_filename}"
return public_video_url
except Exception as e:
print(f"Error uploading video to R2: {e}")
return None
def video_to_base64(video_path):
"""
Convert a video file to base64 string for direct download
"""
try:
with open(video_path, 'rb') as video_file:
video_data = video_file.read()
base64_data = base64.b64encode(video_data).decode('utf-8')
return f"data:video/mp4;base64,{base64_data}"
except Exception as e:
print(f"Error converting video to base64: {e}")
return None
def download_video_from_url(video_url):
"""
Download a video from a public R2 URL
Returns the local file path where the video is saved
"""
try:
# Create a temporary file with .mp4 extension
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
temp_path = temp_file.name
temp_file.close()
# Download the video
print(f"Downloading video from: {video_url}")
response = requests.get(video_url, stream=True)
response.raise_for_status()
# Save to temporary file
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Video downloaded to: {temp_path}")
return temp_path
except Exception as e:
print(f"Error downloading video: {e}")
return None
def cleanup_temp_video(file_path):
"""
Clean up temporary video file
"""
try:
if file_path and os.path.exists(file_path):
os.unlink(file_path)
print(f"Cleaned up: {file_path}")
except Exception as e:
print(f"Error cleaning up file: {e}")
def process_text_to_gloss(text):
"""
Convert text directly to ASL gloss
"""
try:
# For text input, we can use a simpler approach or call the
# document converter with a temporary text file
import tempfile
# Create a temporary text file
with tempfile.NamedTemporaryFile(
mode='w', suffix='.txt', delete=False
) as temp_file:
temp_file.write(text)
temp_file_path = temp_file.name
# Use the existing document converter
gloss = asl_converter.convert_document(temp_file_path)
# Clean up the temporary file
os.unlink(temp_file_path)
return gloss
except Exception as e:
print(f"Error processing text: {e}")
return None
def process_input(input_data):
"""
Process either text input or file upload
input_data can be either a string (text) or a file object
"""
if input_data is None:
return None
# Check if it's a file object (has .name attribute)
if hasattr(input_data, 'name'):
# It's a file upload
print(f"Processing file: {input_data.name}")
return asl_converter.convert_document(input_data.name)
else:
# It's text input
print(f"Processing text input: "
f"{input_data[:100]}...")
return process_text_to_gloss(input_data)
async def parse_vectorize_and_search_unified(input_data):
"""
Unified function that handles both text and file inputs
"""
print(f"Input type: {type(input_data)}")
# Process the input to get gloss
gloss = process_input(input_data)
if not gloss:
return {
"status": "error",
"message": "Failed to process input"
}, None, ""
print("ASL", gloss)
# Split by spaces and clean each token
gloss_tokens = gloss.split()
cleaned_tokens = []
for token in gloss_tokens:
cleaned = clean_gloss_token(token)
if cleaned: # Only add non-empty tokens
cleaned_tokens.append(cleaned)
print("Cleaned tokens:", cleaned_tokens)
videos = []
video_files = [] # Store local file paths for stitching
for g in cleaned_tokens:
print(f"Processing {g}")
try:
result = await vectorizer.vector_query_from_supabase(query=g)
print("result", result)
if result.get("match", False):
video_url = result["video_url"]
videos.append(video_url)
# Download the video
local_path = download_video_from_url(video_url)
if local_path:
video_files.append(local_path)
except Exception as e:
print(f"Error processing {g}: {e}")
continue
# Create stitched video if we have multiple videos
stitched_video_path = None
if len(video_files) > 1:
try:
print(f"Creating stitched video from {len(video_files)} videos...")
stitched_video_path = tempfile.NamedTemporaryFile(
delete=False, suffix='.mp4'
).name
create_multi_stitched_video(video_files, stitched_video_path)
print(f"Stitched video created: {stitched_video_path}")
except Exception as e:
print(f"Error creating stitched video: {e}")
stitched_video_path = None
elif len(video_files) == 1:
# If only one video, just use it directly
stitched_video_path = video_files[0]
# Upload final video to R2 and get public URL
final_video_url = None
if stitched_video_path:
final_video_url = upload_video_to_r2(stitched_video_path)
# Clean up the local file after upload
cleanup_temp_video(stitched_video_path)
# Clean up individual video files after stitching
for video_file in video_files:
if video_file != stitched_video_path: # Don't delete the final output
cleanup_temp_video(video_file)
# Create download link HTML
download_html = ""
if final_video_url:
download_html = f"""
<div style="text-align: center; padding: 20px;">
<h3>Download Your ASL Video</h3>
<a href="{final_video_url}" download="asl_video.mp4"
style="background-color: #4CAF50; color: white;
padding: 12px 24px; text-decoration: none;
border-radius: 4px; display: inline-block;">
Download Video
</a>
<p style="margin-top: 10px; color: #666;">
<small>Right-click and "Save As" if the download doesn't
start automatically</small>
</p>
</div>
"""
return {
"status": "success",
"videos": videos,
"video_count": len(videos),
"gloss": gloss,
"cleaned_tokens": cleaned_tokens,
"final_video_url": final_video_url
}, final_video_url, download_html
def parse_vectorize_and_search_unified_sync(input_data):
return asyncio.run(parse_vectorize_and_search_unified(input_data))
def predict_unified(input_data):
"""
Unified prediction function that handles both text and file inputs
"""
try:
if input_data is None:
return {
"status": "error",
"message": "Please provide text or upload a document"
}, None, ""
# Use the unified processing function
result = parse_vectorize_and_search_unified_sync(input_data)
return result
except Exception as e:
print(f"Error in predict_unified function: {e}")
return {
"status": "error",
"message": f"An error occurred: {str(e)}"
}, None, ""
# Create the Gradio interface
def create_interface():
"""Create and configure the Gradio interface"""
with gr.Blocks(title=title) as demo:
gr.Markdown(f"# {title}")
gr.Markdown(description)
with gr.Row():
with gr.Column():
# Input section
gr.Markdown("## Input Options")
# Text input
gr.Markdown("### Option 1: Enter Text")
text_input = gr.Textbox(
label="Enter text to convert to ASL",
placeholder="Type or paste your text here...",
lines=5,
max_lines=10
)
gr.Markdown("### Option 2: Upload Document")
file_input = gr.File(
label="Upload Document (pdf, txt, docx, or epub)",
file_types=[".pdf", ".txt", ".docx", ".epub"]
)
# Processing options
gr.Markdown("## Processing Options")
use_r2 = gr.Checkbox(
label="Use Cloud Storage (R2)",
value=True,
info=("Upload video to cloud storage for "
"persistent access")
)
process_btn = gr.Button(
"Generate ASL Video",
variant="primary"
)
with gr.Column():
# Output section
gr.Markdown("## Results")
json_output = gr.JSON(label="Processing Results")
video_output = gr.Video(label="ASL Video Output")
download_html = gr.HTML(label="Download Link")
# Handle the processing
def process_inputs(text, file, use_r2_storage):
# Determine which input to use
if text and text.strip():
# Use text input
input_data = text.strip()
elif file is not None:
# Use file input
input_data = file
else:
# No input provided
return {
"status": "error",
"message": "Please provide either text or upload a file"
}, None, ""
# Process using the unified function
return predict_unified(input_data)
process_btn.click(
fn=process_inputs,
inputs=[text_input, file_input, use_r2],
outputs=[json_output, video_output, download_html]
)
# Footer
gr.Markdown(article)
return demo
# For Hugging Face Spaces, use the Blocks interface
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True # Set to True for local testing with public URL
)