ai-sl-api / app.py
deenasun's picture
add cloudflare upload and base64 for video output response to gradio
dadcb61
raw
history blame
11.7 kB
from document_to_gloss import DocumentToASLConverter
from vectorizer import Vectorizer
from video_gen import create_multi_stitched_video
import gradio as gr
import asyncio
import re
import boto3
import os
from botocore.config import Config
from dotenv import load_dotenv
import requests
import tempfile
import uuid
import base64
# Load environment variables from .env file
load_dotenv()
# Load R2/S3 environment secrets
R2_ENDPOINT = os.environ.get("R2_ENDPOINT")
R2_ACCESS_KEY_ID = os.environ.get("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.environ.get("R2_SECRET_ACCESS_KEY")
# Validate that required environment variables are set
if not all([R2_ENDPOINT, R2_ACCESS_KEY_ID, R2_SECRET_ACCESS_KEY]):
raise ValueError("Missing required R2 environment variables. Please check your .env file.")
title = "AI-SL"
description = "Convert text to ASL!"
article = ("<p style='text-align: center'><a href='https://github.com/deenasun' "
"target='_blank'>Deena Sun on Github</a></p>")
inputs = gr.File(label="Upload Document (pdf, txt, docx, or epub)")
outputs = [
gr.JSON(label="Processing Results"),
gr.Video(label="ASL Video Output"),
gr.HTML(label="Download Link")
]
asl_converter = DocumentToASLConverter()
vectorizer = Vectorizer()
session = boto3.session.Session()
s3 = session.client(
service_name='s3',
region_name='auto',
endpoint_url=R2_ENDPOINT,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
config=Config(signature_version='s3v4')
)
def clean_gloss_token(token):
"""
Clean a gloss token by removing brackets, newlines, and extra whitespace
"""
# Remove brackets and newlines
cleaned = re.sub(r'[\[\]\n\r]', '', token)
# Remove extra whitespace
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
cleaned = cleaned.lower()
return cleaned
def upload_video_to_r2(video_path, bucket_name="ai-sl-videos"):
"""
Upload a video file to R2 and return a public URL
"""
try:
# Generate a unique filename
file_extension = os.path.splitext(video_path)[1]
unique_filename = f"{uuid.uuid4()}{file_extension}"
# Upload to R2
with open(video_path, 'rb') as video_file:
s3.upload_fileobj(
video_file,
bucket_name,
unique_filename,
ExtraArgs={'ACL': 'public-read'}
)
# Generate the public URL
video_url = f"{R2_ENDPOINT}/{bucket_name}/{unique_filename}"
print(f"Video uploaded to R2: {video_url}")
return video_url
except Exception as e:
print(f"Error uploading video to R2: {e}")
return None
def video_to_base64(video_path):
"""
Convert a video file to base64 string for direct download
"""
try:
with open(video_path, 'rb') as video_file:
video_data = video_file.read()
base64_data = base64.b64encode(video_data).decode('utf-8')
return f"data:video/mp4;base64,{base64_data}"
except Exception as e:
print(f"Error converting video to base64: {e}")
return None
def download_video_from_url(video_url):
"""
Download a video from a public R2 URL
Returns the local file path where the video is saved
"""
try:
# Create a temporary file with .mp4 extension
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
temp_path = temp_file.name
temp_file.close()
# Download the video
print(f"Downloading video from: {video_url}")
response = requests.get(video_url, stream=True)
response.raise_for_status()
# Save to temporary file
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Video downloaded to: {temp_path}")
return temp_path
except Exception as e:
print(f"Error downloading video: {e}")
return None
def cleanup_temp_video(file_path):
"""
Clean up temporary video file
"""
try:
if file_path and os.path.exists(file_path):
os.unlink(file_path)
print(f"Cleaned up: {file_path}")
except Exception as e:
print(f"Error cleaning up file: {e}")
async def parse_vectorize_and_search(file):
print(file)
gloss = asl_converter.convert_document(file)
print("ASL", gloss)
# Split by spaces and clean each token
gloss_tokens = gloss.split()
cleaned_tokens = []
for token in gloss_tokens:
cleaned = clean_gloss_token(token)
if cleaned: # Only add non-empty tokens
cleaned_tokens.append(cleaned)
print("Cleaned tokens:", cleaned_tokens)
videos = []
video_files = [] # Store local file paths for stitching
for g in cleaned_tokens:
print(f"Processing {g}")
try:
result = await vectorizer.vector_query_from_supabase(query=g)
print("result", result)
if result.get("match", False):
video_url = result["video_url"]
videos.append(video_url)
# Download the video
local_path = download_video_from_url(video_url)
if local_path:
video_files.append(local_path)
except Exception as e:
print(f"Error processing {g}: {e}")
continue
# Create stitched video if we have multiple videos
stitched_video_path = None
if len(video_files) > 1:
try:
print(f"Creating stitched video from {len(video_files)} videos...")
stitched_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name
create_multi_stitched_video(video_files, stitched_video_path)
print(f"Stitched video created: {stitched_video_path}")
except Exception as e:
print(f"Error creating stitched video: {e}")
stitched_video_path = None
elif len(video_files) == 1:
# If only one video, just use it directly
stitched_video_path = video_files[0]
# Upload final video to R2 and get public URL
final_video_url = None
if stitched_video_path:
final_video_url = upload_video_to_r2(stitched_video_path)
# Clean up the local file after upload
cleanup_temp_video(stitched_video_path)
# Clean up individual video files after stitching
for video_file in video_files:
if video_file != stitched_video_path: # Don't delete the final output
cleanup_temp_video(video_file)
# Create download link HTML
download_html = ""
if final_video_url:
download_html = f"""
<div style="text-align: center; padding: 20px;">
<h3>Download Your ASL Video</h3>
<a href="{final_video_url}" download="asl_video.mp4"
style="background-color: #4CAF50; color: white;
padding: 12px 24px; text-decoration: none;
border-radius: 4px; display: inline-block;">
Download Video
</a>
<p style="margin-top: 10px; color: #666;">
<small>Right-click and "Save As" if the download doesn't
start automatically</small>
</p>
</div>
"""
return {
"status": "success",
"videos": videos,
"video_count": len(videos),
"gloss": gloss,
"cleaned_tokens": cleaned_tokens,
"final_video_url": final_video_url
}, final_video_url, download_html
# Create a synchronous wrapper for Gradio
def parse_vectorize_and_search_sync(file):
return asyncio.run(parse_vectorize_and_search(file))
async def parse_vectorize_and_search_base64(file):
"""
Alternative version that returns video as base64 data instead of uploading to R2
"""
print(file)
gloss = asl_converter.convert_document(file)
print("ASL", gloss)
# Split by spaces and clean each token
gloss_tokens = gloss.split()
cleaned_tokens = []
for token in gloss_tokens:
cleaned = clean_gloss_token(token)
if cleaned: # Only add non-empty tokens
cleaned_tokens.append(cleaned)
print("Cleaned tokens:", cleaned_tokens)
videos = []
video_files = [] # Store local file paths for stitching
for g in cleaned_tokens:
print(f"Processing {g}")
try:
result = await vectorizer.vector_query_from_supabase(query=g)
print("result", result)
if result.get("match", False):
video_url = result["video_url"]
videos.append(video_url)
# Download the video
local_path = download_video_from_url(video_url)
if local_path:
video_files.append(local_path)
except Exception as e:
print(f"Error processing {g}: {e}")
continue
# Create stitched video if we have multiple videos
stitched_video_path = None
if len(video_files) > 1:
try:
print(f"Creating stitched video from {len(video_files)} videos...")
stitched_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name
create_multi_stitched_video(video_files, stitched_video_path)
print(f"Stitched video created: {stitched_video_path}")
except Exception as e:
print(f"Error creating stitched video: {e}")
stitched_video_path = None
elif len(video_files) == 1:
# If only one video, just use it directly
stitched_video_path = video_files[0]
# Convert final video to base64
final_video_base64 = None
if stitched_video_path:
final_video_base64 = video_to_base64(stitched_video_path)
# Clean up the local file after conversion
cleanup_temp_video(stitched_video_path)
# Clean up individual video files after stitching
for video_file in video_files:
if video_file != stitched_video_path: # Don't delete the final output
cleanup_temp_video(video_file)
# Create download link HTML for base64
download_html = ""
if final_video_base64:
download_html = f"""
<div style="text-align: center; padding: 20px;">
<h3>Download Your ASL Video</h3>
<a href="{final_video_base64}" download="asl_video.mp4"
style="background-color: #4CAF50; color: white;
padding: 12px 24px; text-decoration: none;
border-radius: 4px; display: inline-block;">
Download Video
</a>
<p style="margin-top: 10px; color: #666;">
<small>Video is embedded directly - click to download</small>
</p>
</div>
"""
return {
"status": "success",
"videos": videos,
"video_count": len(videos),
"gloss": gloss,
"cleaned_tokens": cleaned_tokens,
"video_format": "base64"
}, final_video_base64, download_html
def parse_vectorize_and_search_base64_sync(file):
return asyncio.run(parse_vectorize_and_search_base64(file))
intf = gr.Interface(
fn=parse_vectorize_and_search_sync,
inputs=inputs,
outputs=outputs,
title=title,
description=description,
article=article
)
intf.launch(share=True)