Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| # Install required libraries if running outside Colab | |
| # !pip install gradio yt-dlp moviepy pillow speechrecognition llama-index lancedb google-generativeai | |
| import gradio as gr | |
| from moviepy import VideoFileClip | |
| from pathlib import Path | |
| import speech_recognition as sr | |
| from PIL import Image | |
| import os | |
| import shutil | |
| import json | |
| import matplotlib.pyplot as plt | |
| import yt_dlp | |
| import requests | |
| import base64 | |
| from io import BytesIO | |
| # Add your existing methods here (download_video, video_to_images, video_to_audio, audio_to_text, prepare_video...) | |
| def plot_images(image_paths): | |
| images_shown = 0 | |
| plt.figure(figsize=(16, 9)) | |
| img_files = [] | |
| for img_path in image_paths: | |
| if os.path.isfile(img_path): | |
| img_files.append(img_path) | |
| images_shown += 1 | |
| if images_shown >= 7: | |
| break | |
| return img_files | |
| def download_video(video_url, output_video_path="./video_data/"): | |
| ydl_opts = { | |
| "format": "bestvideo+bestaudio/best", | |
| "merge_output_format": "mp4", | |
| "outtmpl": f"{output_video_path}/input_vid.mp4", | |
| "noplaylist": True, | |
| "quiet": False, | |
| # Uncomment and set your cookie file path if required | |
| # "cookiefile": "cookies.txt", | |
| } | |
| Path(output_video_path).mkdir(parents=True, exist_ok=True) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(video_url, download=True) | |
| info = ydl.sanitize_info(info) | |
| return { | |
| "title": info.get("title"), | |
| "uploader": info.get("uploader"), | |
| "views": info.get("view_count"), | |
| } | |
| def video_to_images(video_path, output_folder): | |
| Path(output_folder).mkdir(parents=True, exist_ok=True) | |
| clip = VideoFileClip(video_path) | |
| clip.write_images_sequence( | |
| os.path.join(output_folder, "frame%04d.png"), fps=0.2 | |
| ) | |
| def video_to_audio(video_path, output_audio_path): | |
| clip = VideoFileClip(video_path) | |
| audio = clip.audio | |
| audio.write_audiofile(output_audio_path) | |
| def audio_to_text(audio_path): | |
| recognizer = sr.Recognizer() | |
| try: | |
| with sr.AudioFile(audio_path) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data) | |
| return text | |
| except sr.UnknownValueError: | |
| print("Google Speech Recognition could not understand the audio.") | |
| except sr.RequestError as e: | |
| print(f"Could not request results: {e}") | |
| return None | |
| def prepare_all_videos( | |
| video_folder="./video_data/", | |
| output_folder="./mixed_data/" | |
| ): | |
| """ | |
| Processes all video files in video_folder, extracting images and text for each, | |
| and stores them in unique subfolders under output_folder. | |
| Returns a list of metadata dicts for all videos. | |
| """ | |
| Path(output_folder).mkdir(parents=True, exist_ok=True) | |
| video_files = [f for f in os.listdir(video_folder) if f.lower().endswith(('.mp4', '.mov', '.avi', '.mkv'))] | |
| all_metadata = [] | |
| for video_file in video_files: | |
| video_path = os.path.join(video_folder, video_file) | |
| video_name = Path(video_file).stem | |
| video_output_folder = os.path.join(output_folder, video_name) | |
| Path(video_output_folder).mkdir(parents=True, exist_ok=True) | |
| audio_path = os.path.join(video_output_folder, "output_audio.wav") | |
| # Extract images and audio | |
| video_to_images(video_path, video_output_folder) | |
| video_to_audio(video_path, audio_path) | |
| # Transcribe audio | |
| text_data = audio_to_text(audio_path) | |
| text_path = os.path.join(video_output_folder, "output_text.txt") | |
| with open(text_path, "w") as file: | |
| file.write(text_data if text_data else "") | |
| os.remove(audio_path) | |
| # Dummy metadata, you can enhance this as needed | |
| meta = { | |
| "title": video_name, | |
| "uploader": "unknown", | |
| "views": "unknown", | |
| "file": video_file | |
| } | |
| all_metadata.append({"meta": meta, "text": text_data, "folder": video_output_folder}) | |
| return all_metadata | |
| from llama_index.core.indices import MultiModalVectorStoreIndex | |
| from llama_index.core import SimpleDirectoryReader, StorageContext | |
| from llama_index.vector_stores.lancedb import LanceDBVectorStore | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_index.core import Settings | |
| def create_vector_db_for_all(image_txt_root_folder: str): | |
| """ | |
| Loads all subfolders in image_txt_root_folder as documents for the vector DB. | |
| """ | |
| text_store = LanceDBVectorStore(uri="lancedb", table_name="text_collection") | |
| image_store = LanceDBVectorStore(uri="lancedb", table_name="image_collection") | |
| storage_context = StorageContext.from_defaults( | |
| vector_store=text_store, image_store=image_store | |
| ) | |
| Settings.embed_model = HuggingFaceEmbedding( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| # Load all subfolders as documents | |
| documents = [] | |
| for subfolder in Path(image_txt_root_folder).iterdir(): | |
| if subfolder.is_dir(): | |
| documents.extend(SimpleDirectoryReader(str(subfolder)).load_data()) | |
| index = MultiModalVectorStoreIndex.from_documents( | |
| documents, | |
| storage_context=storage_context, | |
| ) | |
| retriever_engine = index.as_retriever( | |
| similarity_top_k=2, image_similarity_top_k=3 | |
| ) | |
| return retriever_engine | |
| from llama_index.core.schema import ImageNode | |
| def retrieve(retriever_engine, query_str): | |
| retrieval_results = retriever_engine.retrieve(query_str) | |
| retrieved_image = [] | |
| retrieved_text = [] | |
| for res_node in retrieval_results: | |
| if isinstance(res_node.node, ImageNode): | |
| retrieved_image.append(res_node.node.metadata["file_path"]) | |
| else: | |
| retrieved_text.append(res_node.text) | |
| return retrieved_image, retrieved_text | |
| qa_tmpl_str = ( | |
| "Given the provided information, including relevant images and retrieved context from the video, \ | |
| accurately and precisely answer the query without any additional prior knowledge.\n" | |
| "Please ensure honesty and responsibility, refraining from any racist or sexist remarks.\n" | |
| "---------------------\n" | |
| "Context: {context_str}\n" | |
| "Metadata for video: {metadata_str} \n" | |
| "---------------------\n" | |
| "Query: {query_str}\n" | |
| "Answer: " | |
| ) | |
| # Define model values and their corresponding labels | |
| available_models = [ | |
| {"value": "meta-llama/llama-4-maverick:free", "label": "Llama"}, | |
| {"value": "qwen/qwen2.5-vl-72b-instruct:free", "label": "Qwen"}, | |
| {"value": "google/gemma-3-27b-it:free", "label": "Gemma"}, | |
| {"value": "moonshotai/kimi-vl-a3b-thinking:free", "label": "Kimi"}, | |
| {"value": "google/gemini-2.0-flash-exp:free", "label": "Gemini"}, | |
| # Add more models here if needed | |
| ] | |
| # Helper to get value from label or vice versa | |
| model_value_to_label = {item["value"]: item["label"] for item in available_models} | |
| model_label_to_value = {item["label"]: item["value"] for item in available_models} | |
| # Gradio interface function | |
| def gradio_chat(query, model_label): | |
| output_video_path = "./video_data/" | |
| output_folder = "./mixed_data/" | |
| try: | |
| # Process all videos | |
| all_metadata = prepare_all_videos(output_video_path, output_folder) | |
| # Combine metadata for all videos | |
| metadata_str = json.dumps([item["meta"] for item in all_metadata]) | |
| retriever_engine = create_vector_db_for_all(output_folder) | |
| img, txt = retrieve(retriever_engine=retriever_engine, query_str=query) | |
| context_str = "".join(txt) | |
| prompt = qa_tmpl_str.format( | |
| context_str=context_str, query_str=query, metadata_str=metadata_str | |
| ) | |
| OPENROUTER_API_KEY = os.environ['OPENROUTER_API_KEY'] | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "<YOUR_SITE_URL>", | |
| "X-Title": "<YOUR_SITE_NAME>", | |
| } | |
| model_name = model_label_to_value.get(model_label, available_models[0]["value"]) | |
| messages = [{"role": "user", "content": [{"type": "text", "text": prompt}]}] | |
| image_paths = [] | |
| for img_path in img: | |
| try: | |
| image = Image.open(img_path) | |
| buffered = BytesIO() | |
| image.save(buffered, format="JPEG") | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| messages[0]["content"].append({ | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"} | |
| }) | |
| image_paths.append(img_path) | |
| except Exception as e: | |
| print(f"Error loading image {img_path}: {e}") | |
| data = { | |
| "model": model_name, | |
| "messages": messages, | |
| } | |
| response = requests.post( | |
| url="https://openrouter.ai/api/v1/chat/completions", | |
| headers=headers, | |
| data=json.dumps(data) | |
| ) | |
| response.raise_for_status() | |
| result_text = response.json()['choices'][0]['message']['content'] | |
| return result_text, image_paths | |
| except Exception as e: | |
| return f"Error: {str(e)}", [] | |
| # Gradio UI | |
| gradio_ui = gr.Interface( | |
| fn=gradio_chat, | |
| inputs=[ | |
| gr.Textbox(label="",placeholder="Try: Best island in Maldives"), | |
| gr.Dropdown( | |
| choices=[item["label"] for item in available_models], | |
| value=available_models[0]["label"], | |
| label="Select Model:" | |
| ) | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Vega Response:"), | |
| gr.Gallery(label="Relevant Images", allow_preview=True), | |
| ], | |
| title="", | |
| description="", | |
| theme = gr.themes.Default(primary_hue="sky"), | |
| css="footer {visibility: hidden}", | |
| deep_link=False | |
| ) | |
| if __name__ == "__main__": | |
| gradio_ui.launch(share=False) |