ClipQuery / app.py
maguid28's picture
Update app.py
7ae0df4 verified
import gradio as gr
from qa_engine import load_index, build_chain
from clipper import clip
from index_builder import build_index
from logging_config import logger
import os
import json
import time
import subprocess
# Global variables
store = None
qa_chain = None
SOURCE_AUDIO = None
model_name = "phi3" # Default to phi3 which is local
index_loaded = False
# --- load at startup (may not exist on first run) ---
try:
if os.path.exists("data"):
store, segments = load_index("data")
if store:
qa_chain = build_chain(store, model_name)
SOURCE_AUDIO = "downloads/audio.mp3"
index_loaded = True
logger.info("Successfully loaded existing index")
except Exception as e:
logger.warning("No existing index found or error loading index: %s. Upload a media file to build one.", str(e))
store = qa_chain = None
SOURCE_AUDIO = None
index_loaded = False
def _fmt(sec: float) -> str:
h = int(sec // 3600)
m = int((sec % 3600) // 60)
s = int(sec % 60)
return f"{h:02d}:{m:02d}:{s:02d}"
def update_progress(progress: int, message: str):
"""Helper to update progress bar"""
return f"<script>updateProgress({progress}, '{message}')</script>"
def handle(question: str):
global qa_chain, store, SOURCE_AUDIO
logger.info(f"Handling question: {question}")
if not store:
msg = "⚠️ No vector store found. Please upload a media file first."
logger.warning(msg)
return None, msg, update_progress(0, "Waiting for input...")
if not qa_chain:
msg = "⚠️ QA chain not initialized. Please select a model and try again."
logger.warning(msg)
return None, msg, update_progress(0, "Waiting for input...")
if not question.strip():
msg = "⚠️ Please enter a question."
logger.warning(msg)
return None, msg, update_progress(0, "Waiting for input...")
try:
# Update progress
logger.info("Processing question...")
yield None, "Processing your question...", update_progress(20, "Analyzing question...")
# Query the QA chain
logger.info(f"Querying QA chain with question: {question}")
result = qa_chain({"question": question}, return_only_outputs=True)
logger.info(f"QA chain result: {result}")
# Extract the answer and source documents
answer = result.get("answer", "No answer found.")
source_docs = result.get("source_documents", [])
logger.info(f"Found {len(source_docs)} source documents")
if not source_docs:
msg = "ℹ️ No relevant content found in the audio."
logger.info(msg)
yield None, msg, update_progress(100, "No results found")
return
# Get the first document's metadata for timestamp
metadata = source_docs[0].metadata
logger.info(f"Source document metadata: {metadata}")
start_time = float(metadata.get("start", 0))
end_time = start_time + 30 # 30-second clip
# Format timestamp
start_str = f"{int(start_time // 60)}:{int(start_time % 60):02d}"
end_str = f"{int(end_time // 60)}:{int(end_time % 60):02d}"
logger.info(f"Extracting clip from {start_str} to {end_str}...")
yield None, f"Extracting clip from {start_str} to {end_str}...", update_progress(75, "Extracting audio...")
try:
logger.info(f"Calling clip() with source: {SOURCE_AUDIO}, start: {start_time}, end: {end_time}")
clip_path = clip(SOURCE_AUDIO, start_time, end_time)
logger.info(f"Clip created at: {clip_path}")
if not clip_path or not os.path.exists(clip_path):
error_msg = f"Failed to create clip at {clip_path}"
logger.error(error_msg)
raise FileNotFoundError(error_msg)
success_msg = f"🎧 Clip from {start_str} to {end_str}"
logger.info(success_msg)
yield clip_path, success_msg, update_progress(100, "Done!")
except Exception as e:
error_msg = f"❌ Error creating audio clip: {str(e)}"
logger.error(error_msg, exc_info=True)
yield None, error_msg, update_progress(0, "Error creating clip")
except Exception as e:
error_msg = f"❌ Error processing question: {str(e)}"
logger.error(error_msg, exc_info=True)
yield None, error_msg, update_progress(0, "Error occurred")
def upload_media(file, progress=gr.Progress()):
"""Build index from uploaded media and refresh QA chain."""
global SOURCE_AUDIO, qa_chain, store, model_name
if file is None:
logger.error("No file was uploaded")
return "❌ Error: No file was uploaded."
try:
progress(0.1, desc="Starting upload...")
# Get the actual file path
file_path = file.name if hasattr(file, 'name') else str(file)
logger.info(f"Processing uploaded file: {file_path}")
# Ensure the file exists
if not os.path.exists(file_path):
error_msg = f"File not found at path: {file_path}"
logger.error(error_msg)
return f"❌ Error: {error_msg}"
# Convert to MP3 if needed
if not file_path.lower().endswith('.mp3'):
progress(0.2, desc="Converting to MP3 format...")
logger.info("Converting file to MP3 format...")
base_name = os.path.splitext(file_path)[0]
audio_path = f"{base_name}.mp3"
try:
# Use ffmpeg to convert to MP3
cmd = [
'ffmpeg',
'-i', file_path, # Input file
'-q:a', '0', # Best quality
'-map', 'a', # Only audio
'-y', # Overwrite output file if it exists
audio_path # Output file
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Failed to convert file to MP3: {result.stderr}"
logger.error(error_msg)
return f"❌ Error: {error_msg}"
file_path = audio_path
logger.info(f"Successfully converted to MP3: {file_path}")
except Exception as e:
error_msg = f"Error during MP3 conversion: {str(e)}"
logger.error(error_msg, exc_info=True)
return f"❌ {error_msg}"
# Set the global audio source
SOURCE_AUDIO = file_path
# Create data directory if it doesn't exist
data_dir = "data"
os.makedirs(data_dir, exist_ok=True)
# Build the index
progress(0.4, desc="Transcribing audio with Whisper (this may take a few minutes)...")
logger.info("Starting transcription and index building...")
try:
# Build the index from the audio file
store = build_index(file_path, data_dir)
if not store:
error_msg = "Failed to build index - no documents were processed"
logger.error(error_msg)
return f"❌ {error_msg}"
# Initialize QA chain with the model and store
progress(0.9, desc="Initializing QA system...")
logger.info("Initializing QA chain...")
qa_chain = build_chain(store, model_name)
if not qa_chain:
error_msg = "Failed to initialize QA chain"
logger.error(error_msg)
return f"❌ {error_msg}"
progress(1.0, desc="Ready!")
success_msg = f"βœ… Ready! Successfully processed {os.path.basename(file_path)}"
logger.info(success_msg)
return success_msg
except Exception as e:
error_msg = f"Error during index building: {str(e)}"
logger.error(error_msg, exc_info=True)
return f"❌ {error_msg}"
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
logger.error(error_msg, exc_info=True)
return f"❌ {error_msg}"
def tail_log(n: int = 200):
"""Return last n log entries pretty-printed JSON."""
path = os.path.join(os.path.dirname(__file__), "langchain_debug.jsonl")
if not os.path.exists(path):
return "{}" # empty JSON
with open(path, "r", encoding="utf-8") as f:
raw = f.readlines()[-n:]
objs = []
for ln in raw:
try:
objs.append(json.loads(ln))
except json.JSONDecodeError:
continue
return "\n\n".join(json.dumps(o, indent=2) for o in objs)
with gr.Blocks() as demo:
# Enable queue for async operations and generators
demo.queue()
with gr.Tab("Ask"):
gr.Markdown("# ClipQuery: Upload any audio/video and ask questions about it. ")
gr.Markdown("### The clip will be extracted from the point in the media where the answer most likely occurs.")
with gr.Row():
with gr.Column(scale=3):
# Model selection
model_dd = gr.Dropdown(
["flan-t5-base (HuggingFace)", "phi3 (Local - requires Ollama)", "tinyllama (Local - requires Ollama)"],
label="Select Model",
value="phi3 (Local - requires Ollama)"
)
with gr.Column(scale=2):
# Hugging Face Token input (initially hidden)
hf_token = gr.Textbox(
label="Hugging Face Token (required for flan-t5-base)",
type="password",
visible=False,
placeholder="Enter your Hugging Face token..."
)
def toggle_token_visibility(model_name):
return gr.update(visible="flan-t5-base" in model_name)
model_dd.change(
fn=toggle_token_visibility,
inputs=model_dd,
outputs=hf_token
)
# Initial token visibility check
toggle_token_visibility(model_dd.value)
uploader = gr.File(label="Upload audio/video", file_types=["audio", "video"])
status = gr.Markdown()
inp = gr.Textbox(label="Ask a question")
out_audio = gr.Audio()
ts_label = gr.Markdown()
# Progress tracker
with gr.Row():
progress = gr.HTML("""
<div style='width: 100%; margin: 10px 0;'>
<div style='display: flex; justify-content: space-between; margin-bottom: 5px;'>
<span id='status'>Ready</span>
<span id='progress'>0%</span>
</div>
<div style='height: 20px; background: #f0f0f0; border-radius: 10px; overflow: hidden;'>
<div id='progress-bar' style='height: 100%; width: 0%; background: #4CAF50; transition: width 0.3s;'></div>
</div>
</div>
""")
# JavaScript for progress updates
js = """
function updateProgress(progress, message) {
const bar = document.getElementById('progress-bar');
const percent = document.getElementById('progress');
const status = document.getElementById('status');
// Ensure progress is a number and has a default
const progressValue = Number(progress) || 0;
bar.style.width = progressValue + '%';
percent.textContent = progressValue + '%';
status.textContent = message || 'Processing...';
if (progressValue >= 100) {
bar.style.background = '#4CAF50';
status.textContent = 'Done!';
} else if (progressValue >= 75) {
bar.style.background = '#2196F3';
} else if (progressValue >= 50) {
bar.style.background = '#FFC107';
} else if (progressValue >= 25) {
bar.style.background = '#FF9800';
} else {
bar.style.background = '#f44336';
}
}
// Initialize on load
document.addEventListener('DOMContentLoaded', function() {
updateProgress(0, 'Ready');
});
"""
demo.load(fn=None, inputs=None, outputs=None, js=js)
def _on_model_change(label, token):
global model_name, qa_chain, store
name = label.split()[0] # drop suffix
if name == model_name:
return "" # No change needed
# Check if this is a local model that needs Ollama
if name in ('phi3', 'tinyllama'):
try:
import requests
response = requests.get('http://localhost:11434', timeout=5)
if response.status_code != 200:
raise ConnectionError("Ollama server not running. Please start it first.")
except Exception as e:
return f"❌ Error: {str(e)}. Please make sure Ollama is running."
if store is None and name != "flan-t5-base":
return "⚠️ Please upload a media file before changing models."
try:
if name == "flan-t5-base" and not token:
return "⚠️ Please enter your Hugging Face token to use flan-t5-base. Get one at https://huggingface.co/settings/tokens"
# Only pass the token if using flan-t5-base
hf_token = token if name == "flan-t5-base" else None
qa_chain = build_chain(store, name, hf_token)
model_name = name # Update the current model name
return f"βœ… Switched to {label}"
except Exception as e:
return f"❌ Failed to switch model: {str(e)}"
model_dd.change(
fn=_on_model_change,
inputs=[model_dd, hf_token],
outputs=status
)
uploader.change(
fn=upload_media,
inputs=uploader,
outputs=status,
api_name="upload_media"
)
inp.submit(
fn=handle,
inputs=inp,
outputs=[out_audio, ts_label, progress],
show_progress=False
)
with gr.Tab("Debug Log"):
log_box = gr.Textbox(label="Application Logs", lines=25, max_lines=25, interactive=False)
refresh_btn = gr.Button("Refresh Logs")
def refresh_logs():
from logging_config import get_logs
logs = get_logs()
return f"""
===== LATEST LOGS =====
{logs[-5000:] if len(logs) > 5000 else logs}
======================
"""
refresh_btn.click(refresh_logs, None, log_box)
demo.load(refresh_logs, None, log_box, every=5)
if __name__ == "__main__":
demo.launch()