VegaTest / app.py
Polarisailabs's picture
Upload app.py
a3ec42d verified
raw
history blame
9.97 kB
# -*- coding: utf-8 -*-
# Install required libraries if running outside Colab
# !pip install gradio yt-dlp moviepy pillow speechrecognition llama-index lancedb google-generativeai
import gradio as gr
from moviepy import VideoFileClip
from pathlib import Path
import speech_recognition as sr
from PIL import Image
import os
import shutil
import json
import matplotlib.pyplot as plt
import yt_dlp
import requests
import base64
from io import BytesIO
# Add your existing methods here (download_video, video_to_images, video_to_audio, audio_to_text, prepare_video...)
def plot_images(image_paths):
images_shown = 0
plt.figure(figsize=(16, 9))
img_files = []
for img_path in image_paths:
if os.path.isfile(img_path):
img_files.append(img_path)
images_shown += 1
if images_shown >= 7:
break
return img_files
def download_video(video_url, output_video_path="./video_data/"):
ydl_opts = {
"format": "bestvideo+bestaudio/best",
"merge_output_format": "mp4",
"outtmpl": f"{output_video_path}/input_vid.mp4",
"noplaylist": True,
"quiet": False,
# Uncomment and set your cookie file path if required
# "cookiefile": "cookies.txt",
}
Path(output_video_path).mkdir(parents=True, exist_ok=True)
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
info = ydl.sanitize_info(info)
return {
"title": info.get("title"),
"uploader": info.get("uploader"),
"views": info.get("view_count"),
}
def video_to_images(video_path, output_folder):
Path(output_folder).mkdir(parents=True, exist_ok=True)
clip = VideoFileClip(video_path)
clip.write_images_sequence(
os.path.join(output_folder, "frame%04d.png"), fps=0.2
)
def video_to_audio(video_path, output_audio_path):
clip = VideoFileClip(video_path)
audio = clip.audio
audio.write_audiofile(output_audio_path)
def audio_to_text(audio_path):
recognizer = sr.Recognizer()
try:
with sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data)
return text
except sr.UnknownValueError:
print("Google Speech Recognition could not understand the audio.")
except sr.RequestError as e:
print(f"Could not request results: {e}")
return None
def prepare_all_videos(
video_folder="./video_data/",
output_folder="./mixed_data/"
):
"""
Processes all video files in video_folder, extracting images and text for each,
and stores them in unique subfolders under output_folder.
Returns a list of metadata dicts for all videos.
"""
Path(output_folder).mkdir(parents=True, exist_ok=True)
video_files = [f for f in os.listdir(video_folder) if f.lower().endswith(('.mp4', '.mov', '.avi', '.mkv'))]
all_metadata = []
for video_file in video_files:
video_path = os.path.join(video_folder, video_file)
video_name = Path(video_file).stem
video_output_folder = os.path.join(output_folder, video_name)
Path(video_output_folder).mkdir(parents=True, exist_ok=True)
audio_path = os.path.join(video_output_folder, "output_audio.wav")
# Extract images and audio
video_to_images(video_path, video_output_folder)
video_to_audio(video_path, audio_path)
# Transcribe audio
text_data = audio_to_text(audio_path)
text_path = os.path.join(video_output_folder, "output_text.txt")
with open(text_path, "w") as file:
file.write(text_data if text_data else "")
os.remove(audio_path)
# Dummy metadata, you can enhance this as needed
meta = {
"title": video_name,
"uploader": "unknown",
"views": "unknown",
"file": video_file
}
all_metadata.append({"meta": meta, "text": text_data, "folder": video_output_folder})
return all_metadata
from llama_index.core.indices import MultiModalVectorStoreIndex
from llama_index.core import SimpleDirectoryReader, StorageContext
from llama_index.vector_stores.lancedb import LanceDBVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import Settings
def create_vector_db_for_all(image_txt_root_folder: str):
"""
Loads all subfolders in image_txt_root_folder as documents for the vector DB.
"""
text_store = LanceDBVectorStore(uri="lancedb", table_name="text_collection")
image_store = LanceDBVectorStore(uri="lancedb", table_name="image_collection")
storage_context = StorageContext.from_defaults(
vector_store=text_store, image_store=image_store
)
Settings.embed_model = HuggingFaceEmbedding(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# Load all subfolders as documents
documents = []
for subfolder in Path(image_txt_root_folder).iterdir():
if subfolder.is_dir():
documents.extend(SimpleDirectoryReader(str(subfolder)).load_data())
index = MultiModalVectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
)
retriever_engine = index.as_retriever(
similarity_top_k=2, image_similarity_top_k=3
)
return retriever_engine
from llama_index.core.schema import ImageNode
def retrieve(retriever_engine, query_str):
retrieval_results = retriever_engine.retrieve(query_str)
retrieved_image = []
retrieved_text = []
for res_node in retrieval_results:
if isinstance(res_node.node, ImageNode):
retrieved_image.append(res_node.node.metadata["file_path"])
else:
retrieved_text.append(res_node.text)
return retrieved_image, retrieved_text
qa_tmpl_str = (
"Given the provided information, including relevant images and retrieved context from the video, \
accurately and precisely answer the query without any additional prior knowledge.\n"
"Please ensure honesty and responsibility, refraining from any racist or sexist remarks.\n"
"---------------------\n"
"Context: {context_str}\n"
"Metadata for video: {metadata_str} \n"
"---------------------\n"
"Query: {query_str}\n"
"Answer: "
)
# Define model values and their corresponding labels
available_models = [
{"value": "meta-llama/llama-4-maverick:free", "label": "Llama"},
{"value": "qwen/qwen2.5-vl-72b-instruct:free", "label": "Qwen"},
{"value": "google/gemma-3-27b-it:free", "label": "Gemma"},
{"value": "moonshotai/kimi-vl-a3b-thinking:free", "label": "Kimi"},
{"value": "google/gemini-2.0-flash-exp:free", "label": "Gemini"},
# Add more models here if needed
]
# Helper to get value from label or vice versa
model_value_to_label = {item["value"]: item["label"] for item in available_models}
model_label_to_value = {item["label"]: item["value"] for item in available_models}
# Gradio interface function
def gradio_chat(query, model_label):
output_video_path = "./video_data/"
output_folder = "./mixed_data/"
try:
# Process all videos
all_metadata = prepare_all_videos(output_video_path, output_folder)
# Combine metadata for all videos
metadata_str = json.dumps([item["meta"] for item in all_metadata])
retriever_engine = create_vector_db_for_all(output_folder)
img, txt = retrieve(retriever_engine=retriever_engine, query_str=query)
context_str = "".join(txt)
prompt = qa_tmpl_str.format(
context_str=context_str, query_str=query, metadata_str=metadata_str
)
OPENROUTER_API_KEY = os.environ['OPENROUTER_API_KEY']
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "<YOUR_SITE_URL>",
"X-Title": "<YOUR_SITE_NAME>",
}
model_name = model_label_to_value.get(model_label, available_models[0]["value"])
messages = [{"role": "user", "content": [{"type": "text", "text": prompt}]}]
image_paths = []
for img_path in img:
try:
image = Image.open(img_path)
buffered = BytesIO()
image.save(buffered, format="JPEG")
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
messages[0]["content"].append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}
})
image_paths.append(img_path)
except Exception as e:
print(f"Error loading image {img_path}: {e}")
data = {
"model": model_name,
"messages": messages,
}
response = requests.post(
url="https://openrouter.ai/api/v1/chat/completions",
headers=headers,
data=json.dumps(data)
)
response.raise_for_status()
result_text = response.json()['choices'][0]['message']['content']
return result_text, image_paths
except Exception as e:
return f"Error: {str(e)}", []
# Gradio UI
gradio_ui = gr.Interface(
fn=gradio_chat,
inputs=[
gr.Textbox(label="",placeholder="Try: Best island in Maldives"),
gr.Dropdown(
choices=[item["label"] for item in available_models],
value=available_models[0]["label"],
label="Select Model:"
)
],
outputs=[
gr.Textbox(label="Vega Response:"),
gr.Gallery(label="Relevant Images", allow_preview=True),
],
title="",
description="",
theme = gr.themes.Default(primary_hue="sky"),
css="footer {visibility: hidden}",
deep_link=False
)
if __name__ == "__main__":
gradio_ui.launch(share=False)