import gradio as gr from qa_engine import load_index, build_chain from clipper import clip from index_builder import build_index from logging_config import logger import os import json import time import subprocess # Global variables store = None qa_chain = None SOURCE_AUDIO = None model_name = "phi3" # Default to phi3 which is local index_loaded = False # --- load at startup (may not exist on first run) --- try: if os.path.exists("data"): store, segments = load_index("data") if store: qa_chain = build_chain(store, model_name) SOURCE_AUDIO = "downloads/audio.mp3" index_loaded = True logger.info("Successfully loaded existing index") except Exception as e: logger.warning("No existing index found or error loading index: %s. Upload a media file to build one.", str(e)) store = qa_chain = None SOURCE_AUDIO = None index_loaded = False def _fmt(sec: float) -> str: h = int(sec // 3600) m = int((sec % 3600) // 60) s = int(sec % 60) return f"{h:02d}:{m:02d}:{s:02d}" def update_progress(progress: int, message: str): """Helper to update progress bar""" return f"" def handle(question: str): global qa_chain, store, SOURCE_AUDIO logger.info(f"Handling question: {question}") if not store: msg = "⚠️ No vector store found. Please upload a media file first." logger.warning(msg) return None, msg, update_progress(0, "Waiting for input...") if not qa_chain: msg = "⚠️ QA chain not initialized. Please select a model and try again." logger.warning(msg) return None, msg, update_progress(0, "Waiting for input...") if not question.strip(): msg = "⚠️ Please enter a question." logger.warning(msg) return None, msg, update_progress(0, "Waiting for input...") try: # Update progress logger.info("Processing question...") yield None, "Processing your question...", update_progress(20, "Analyzing question...") # Query the QA chain logger.info(f"Querying QA chain with question: {question}") result = qa_chain({"question": question}, return_only_outputs=True) logger.info(f"QA chain result: {result}") # Extract the answer and source documents answer = result.get("answer", "No answer found.") source_docs = result.get("source_documents", []) logger.info(f"Found {len(source_docs)} source documents") if not source_docs: msg = "ℹ️ No relevant content found in the audio." logger.info(msg) yield None, msg, update_progress(100, "No results found") return # Get the first document's metadata for timestamp metadata = source_docs[0].metadata logger.info(f"Source document metadata: {metadata}") start_time = float(metadata.get("start", 0)) end_time = start_time + 30 # 30-second clip # Format timestamp start_str = f"{int(start_time // 60)}:{int(start_time % 60):02d}" end_str = f"{int(end_time // 60)}:{int(end_time % 60):02d}" logger.info(f"Extracting clip from {start_str} to {end_str}...") yield None, f"Extracting clip from {start_str} to {end_str}...", update_progress(75, "Extracting audio...") try: logger.info(f"Calling clip() with source: {SOURCE_AUDIO}, start: {start_time}, end: {end_time}") clip_path = clip(SOURCE_AUDIO, start_time, end_time) logger.info(f"Clip created at: {clip_path}") if not clip_path or not os.path.exists(clip_path): error_msg = f"Failed to create clip at {clip_path}" logger.error(error_msg) raise FileNotFoundError(error_msg) success_msg = f"🎧 Clip from {start_str} to {end_str}" logger.info(success_msg) yield clip_path, success_msg, update_progress(100, "Done!") except Exception as e: error_msg = f"❌ Error creating audio clip: {str(e)}" logger.error(error_msg, exc_info=True) yield None, error_msg, update_progress(0, "Error creating clip") except Exception as e: error_msg = f"❌ Error processing question: {str(e)}" logger.error(error_msg, exc_info=True) yield None, error_msg, update_progress(0, "Error occurred") def upload_media(file, progress=gr.Progress()): """Build index from uploaded media and refresh QA chain.""" global SOURCE_AUDIO, qa_chain, store, model_name if file is None: logger.error("No file was uploaded") return "❌ Error: No file was uploaded." try: progress(0.1, desc="Starting upload...") # Get the actual file path file_path = file.name if hasattr(file, 'name') else str(file) logger.info(f"Processing uploaded file: {file_path}") # Ensure the file exists if not os.path.exists(file_path): error_msg = f"File not found at path: {file_path}" logger.error(error_msg) return f"❌ Error: {error_msg}" # Convert to MP3 if needed if not file_path.lower().endswith('.mp3'): progress(0.2, desc="Converting to MP3 format...") logger.info("Converting file to MP3 format...") base_name = os.path.splitext(file_path)[0] audio_path = f"{base_name}.mp3" try: # Use ffmpeg to convert to MP3 cmd = [ 'ffmpeg', '-i', file_path, # Input file '-q:a', '0', # Best quality '-map', 'a', # Only audio '-y', # Overwrite output file if it exists audio_path # Output file ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: error_msg = f"Failed to convert file to MP3: {result.stderr}" logger.error(error_msg) return f"❌ Error: {error_msg}" file_path = audio_path logger.info(f"Successfully converted to MP3: {file_path}") except Exception as e: error_msg = f"Error during MP3 conversion: {str(e)}" logger.error(error_msg, exc_info=True) return f"❌ {error_msg}" # Set the global audio source SOURCE_AUDIO = file_path # Create data directory if it doesn't exist data_dir = "data" os.makedirs(data_dir, exist_ok=True) # Build the index progress(0.4, desc="Transcribing audio with Whisper (this may take a few minutes)...") logger.info("Starting transcription and index building...") try: # Build the index from the audio file store = build_index(file_path, data_dir) if not store: error_msg = "Failed to build index - no documents were processed" logger.error(error_msg) return f"❌ {error_msg}" # Initialize QA chain with the model and store progress(0.9, desc="Initializing QA system...") logger.info("Initializing QA chain...") qa_chain = build_chain(store, model_name) if not qa_chain: error_msg = "Failed to initialize QA chain" logger.error(error_msg) return f"❌ {error_msg}" progress(1.0, desc="Ready!") success_msg = f"✅ Ready! Successfully processed {os.path.basename(file_path)}" logger.info(success_msg) return success_msg except Exception as e: error_msg = f"Error during index building: {str(e)}" logger.error(error_msg, exc_info=True) return f"❌ {error_msg}" except Exception as e: error_msg = f"Unexpected error: {str(e)}" logger.error(error_msg, exc_info=True) return f"❌ {error_msg}" def tail_log(n: int = 200): """Return last n log entries pretty-printed JSON.""" path = os.path.join(os.path.dirname(__file__), "langchain_debug.jsonl") if not os.path.exists(path): return "{}" # empty JSON with open(path, "r", encoding="utf-8") as f: raw = f.readlines()[-n:] objs = [] for ln in raw: try: objs.append(json.loads(ln)) except json.JSONDecodeError: continue return "\n\n".join(json.dumps(o, indent=2) for o in objs) with gr.Blocks() as demo: # Enable queue for async operations and generators demo.queue() with gr.Tab("Ask"): gr.Markdown("# ClipQuery: Upload any audio/video and ask questions about it. ") gr.Markdown("### The clip will be extracted from the point in the media where the answer most likely occurs.") with gr.Row(): with gr.Column(scale=3): # Model selection model_dd = gr.Dropdown( ["flan-t5-base (HuggingFace)", "phi3 (Local - requires Ollama)", "tinyllama (Local - requires Ollama)"], label="Select Model", value="phi3 (Local - requires Ollama)" ) with gr.Column(scale=2): # Hugging Face Token input (initially hidden) hf_token = gr.Textbox( label="Hugging Face Token (required for flan-t5-base)", type="password", visible=False, placeholder="Enter your Hugging Face token..." ) def toggle_token_visibility(model_name): return gr.update(visible="flan-t5-base" in model_name) model_dd.change( fn=toggle_token_visibility, inputs=model_dd, outputs=hf_token ) # Initial token visibility check toggle_token_visibility(model_dd.value) uploader = gr.File(label="Upload audio/video", file_types=["audio", "video"]) status = gr.Markdown() inp = gr.Textbox(label="Ask a question") out_audio = gr.Audio() ts_label = gr.Markdown() # Progress tracker with gr.Row(): progress = gr.HTML("""