|
import gradio as gr |
|
from qa_engine import load_index, build_chain |
|
from clipper import clip |
|
from index_builder import build_index |
|
from logging_config import logger |
|
import os |
|
import json |
|
import time |
|
import subprocess |
|
|
|
|
|
store = None |
|
qa_chain = None |
|
SOURCE_AUDIO = None |
|
model_name = "phi3" |
|
index_loaded = False |
|
|
|
|
|
try: |
|
if os.path.exists("data"): |
|
store, segments = load_index("data") |
|
if store: |
|
qa_chain = build_chain(store, model_name) |
|
SOURCE_AUDIO = "downloads/audio.mp3" |
|
index_loaded = True |
|
logger.info("Successfully loaded existing index") |
|
except Exception as e: |
|
logger.warning("No existing index found or error loading index: %s. Upload a media file to build one.", str(e)) |
|
store = qa_chain = None |
|
SOURCE_AUDIO = None |
|
index_loaded = False |
|
|
|
|
|
def _fmt(sec: float) -> str: |
|
h = int(sec // 3600) |
|
m = int((sec % 3600) // 60) |
|
s = int(sec % 60) |
|
return f"{h:02d}:{m:02d}:{s:02d}" |
|
|
|
|
|
def update_progress(progress: int, message: str): |
|
"""Helper to update progress bar""" |
|
return f"<script>updateProgress({progress}, '{message}')</script>" |
|
|
|
|
|
def handle(question: str): |
|
global qa_chain, store, SOURCE_AUDIO |
|
|
|
logger.info(f"Handling question: {question}") |
|
|
|
if not store: |
|
msg = "β οΈ No vector store found. Please upload a media file first." |
|
logger.warning(msg) |
|
return None, msg, update_progress(0, "Waiting for input...") |
|
|
|
if not qa_chain: |
|
msg = "β οΈ QA chain not initialized. Please select a model and try again." |
|
logger.warning(msg) |
|
return None, msg, update_progress(0, "Waiting for input...") |
|
|
|
if not question.strip(): |
|
msg = "β οΈ Please enter a question." |
|
logger.warning(msg) |
|
return None, msg, update_progress(0, "Waiting for input...") |
|
|
|
try: |
|
|
|
logger.info("Processing question...") |
|
yield None, "Processing your question...", update_progress(20, "Analyzing question...") |
|
|
|
|
|
logger.info(f"Querying QA chain with question: {question}") |
|
result = qa_chain({"question": question}, return_only_outputs=True) |
|
logger.info(f"QA chain result: {result}") |
|
|
|
|
|
answer = result.get("answer", "No answer found.") |
|
source_docs = result.get("source_documents", []) |
|
logger.info(f"Found {len(source_docs)} source documents") |
|
|
|
if not source_docs: |
|
msg = "βΉοΈ No relevant content found in the audio." |
|
logger.info(msg) |
|
yield None, msg, update_progress(100, "No results found") |
|
return |
|
|
|
|
|
metadata = source_docs[0].metadata |
|
logger.info(f"Source document metadata: {metadata}") |
|
|
|
start_time = float(metadata.get("start", 0)) |
|
end_time = start_time + 30 |
|
|
|
|
|
start_str = f"{int(start_time // 60)}:{int(start_time % 60):02d}" |
|
end_str = f"{int(end_time // 60)}:{int(end_time % 60):02d}" |
|
|
|
logger.info(f"Extracting clip from {start_str} to {end_str}...") |
|
yield None, f"Extracting clip from {start_str} to {end_str}...", update_progress(75, "Extracting audio...") |
|
|
|
try: |
|
logger.info(f"Calling clip() with source: {SOURCE_AUDIO}, start: {start_time}, end: {end_time}") |
|
clip_path = clip(SOURCE_AUDIO, start_time, end_time) |
|
logger.info(f"Clip created at: {clip_path}") |
|
|
|
if not clip_path or not os.path.exists(clip_path): |
|
error_msg = f"Failed to create clip at {clip_path}" |
|
logger.error(error_msg) |
|
raise FileNotFoundError(error_msg) |
|
|
|
success_msg = f"π§ Clip from {start_str} to {end_str}" |
|
logger.info(success_msg) |
|
yield clip_path, success_msg, update_progress(100, "Done!") |
|
|
|
except Exception as e: |
|
error_msg = f"β Error creating audio clip: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
yield None, error_msg, update_progress(0, "Error creating clip") |
|
|
|
except Exception as e: |
|
error_msg = f"β Error processing question: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
yield None, error_msg, update_progress(0, "Error occurred") |
|
|
|
|
|
def upload_media(file, progress=gr.Progress()): |
|
"""Build index from uploaded media and refresh QA chain.""" |
|
global SOURCE_AUDIO, qa_chain, store, model_name |
|
|
|
if file is None: |
|
logger.error("No file was uploaded") |
|
return "β Error: No file was uploaded." |
|
|
|
try: |
|
progress(0.1, desc="Starting upload...") |
|
|
|
|
|
file_path = file.name if hasattr(file, 'name') else str(file) |
|
logger.info(f"Processing uploaded file: {file_path}") |
|
|
|
|
|
if not os.path.exists(file_path): |
|
error_msg = f"File not found at path: {file_path}" |
|
logger.error(error_msg) |
|
return f"β Error: {error_msg}" |
|
|
|
|
|
if not file_path.lower().endswith('.mp3'): |
|
progress(0.2, desc="Converting to MP3 format...") |
|
logger.info("Converting file to MP3 format...") |
|
base_name = os.path.splitext(file_path)[0] |
|
audio_path = f"{base_name}.mp3" |
|
|
|
try: |
|
|
|
cmd = [ |
|
'ffmpeg', |
|
'-i', file_path, |
|
'-q:a', '0', |
|
'-map', 'a', |
|
'-y', |
|
audio_path |
|
] |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
if result.returncode != 0: |
|
error_msg = f"Failed to convert file to MP3: {result.stderr}" |
|
logger.error(error_msg) |
|
return f"β Error: {error_msg}" |
|
|
|
file_path = audio_path |
|
logger.info(f"Successfully converted to MP3: {file_path}") |
|
|
|
except Exception as e: |
|
error_msg = f"Error during MP3 conversion: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
return f"β {error_msg}" |
|
|
|
|
|
SOURCE_AUDIO = file_path |
|
|
|
|
|
data_dir = "data" |
|
os.makedirs(data_dir, exist_ok=True) |
|
|
|
|
|
progress(0.4, desc="Transcribing audio with Whisper (this may take a few minutes)...") |
|
logger.info("Starting transcription and index building...") |
|
|
|
try: |
|
|
|
store = build_index(file_path, data_dir) |
|
|
|
if not store: |
|
error_msg = "Failed to build index - no documents were processed" |
|
logger.error(error_msg) |
|
return f"β {error_msg}" |
|
|
|
|
|
progress(0.9, desc="Initializing QA system...") |
|
logger.info("Initializing QA chain...") |
|
|
|
qa_chain = build_chain(store, model_name) |
|
|
|
if not qa_chain: |
|
error_msg = "Failed to initialize QA chain" |
|
logger.error(error_msg) |
|
return f"β {error_msg}" |
|
|
|
progress(1.0, desc="Ready!") |
|
success_msg = f"β
Ready! Successfully processed {os.path.basename(file_path)}" |
|
logger.info(success_msg) |
|
return success_msg |
|
|
|
except Exception as e: |
|
error_msg = f"Error during index building: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
return f"β {error_msg}" |
|
|
|
except Exception as e: |
|
error_msg = f"Unexpected error: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
return f"β {error_msg}" |
|
|
|
|
|
def tail_log(n: int = 200): |
|
"""Return last n log entries pretty-printed JSON.""" |
|
path = os.path.join(os.path.dirname(__file__), "langchain_debug.jsonl") |
|
if not os.path.exists(path): |
|
return "{}" |
|
with open(path, "r", encoding="utf-8") as f: |
|
raw = f.readlines()[-n:] |
|
objs = [] |
|
for ln in raw: |
|
try: |
|
objs.append(json.loads(ln)) |
|
except json.JSONDecodeError: |
|
continue |
|
return "\n\n".join(json.dumps(o, indent=2) for o in objs) |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
demo.queue() |
|
with gr.Tab("Ask"): |
|
gr.Markdown("# ClipQuery: Upload any audio/video and ask questions about it. ") |
|
gr.Markdown("### The clip will be extracted from the point in the media where the answer most likely occurs.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
|
|
model_dd = gr.Dropdown( |
|
["flan-t5-base (HuggingFace)", "phi3 (Local - requires Ollama)", "tinyllama (Local - requires Ollama)"], |
|
label="Select Model", |
|
value="phi3 (Local - requires Ollama)" |
|
) |
|
with gr.Column(scale=2): |
|
|
|
hf_token = gr.Textbox( |
|
label="Hugging Face Token (required for flan-t5-base)", |
|
type="password", |
|
visible=False, |
|
placeholder="Enter your Hugging Face token..." |
|
) |
|
|
|
def toggle_token_visibility(model_name): |
|
return gr.update(visible="flan-t5-base" in model_name) |
|
|
|
model_dd.change( |
|
fn=toggle_token_visibility, |
|
inputs=model_dd, |
|
outputs=hf_token |
|
) |
|
|
|
|
|
toggle_token_visibility(model_dd.value) |
|
|
|
uploader = gr.File(label="Upload audio/video", file_types=["audio", "video"]) |
|
status = gr.Markdown() |
|
inp = gr.Textbox(label="Ask a question") |
|
out_audio = gr.Audio() |
|
ts_label = gr.Markdown() |
|
|
|
|
|
with gr.Row(): |
|
progress = gr.HTML(""" |
|
<div style='width: 100%; margin: 10px 0;'> |
|
<div style='display: flex; justify-content: space-between; margin-bottom: 5px;'> |
|
<span id='status'>Ready</span> |
|
<span id='progress'>0%</span> |
|
</div> |
|
<div style='height: 20px; background: #f0f0f0; border-radius: 10px; overflow: hidden;'> |
|
<div id='progress-bar' style='height: 100%; width: 0%; background: #4CAF50; transition: width 0.3s;'></div> |
|
</div> |
|
</div> |
|
""") |
|
|
|
|
|
js = """ |
|
function updateProgress(progress, message) { |
|
const bar = document.getElementById('progress-bar'); |
|
const percent = document.getElementById('progress'); |
|
const status = document.getElementById('status'); |
|
|
|
// Ensure progress is a number and has a default |
|
const progressValue = Number(progress) || 0; |
|
|
|
bar.style.width = progressValue + '%'; |
|
percent.textContent = progressValue + '%'; |
|
status.textContent = message || 'Processing...'; |
|
|
|
if (progressValue >= 100) { |
|
bar.style.background = '#4CAF50'; |
|
status.textContent = 'Done!'; |
|
} else if (progressValue >= 75) { |
|
bar.style.background = '#2196F3'; |
|
} else if (progressValue >= 50) { |
|
bar.style.background = '#FFC107'; |
|
} else if (progressValue >= 25) { |
|
bar.style.background = '#FF9800'; |
|
} else { |
|
bar.style.background = '#f44336'; |
|
} |
|
} |
|
// Initialize on load |
|
document.addEventListener('DOMContentLoaded', function() { |
|
updateProgress(0, 'Ready'); |
|
}); |
|
""" |
|
demo.load(fn=None, inputs=None, outputs=None, js=js) |
|
|
|
def _on_model_change(label, token): |
|
global model_name, qa_chain, store |
|
|
|
name = label.split()[0] |
|
if name == model_name: |
|
return "" |
|
|
|
|
|
if name in ('phi3', 'tinyllama'): |
|
try: |
|
import requests |
|
response = requests.get('http://localhost:11434', timeout=5) |
|
if response.status_code != 200: |
|
raise ConnectionError("Ollama server not running. Please start it first.") |
|
except Exception as e: |
|
return f"β Error: {str(e)}. Please make sure Ollama is running." |
|
|
|
if store is None and name != "flan-t5-base": |
|
return "β οΈ Please upload a media file before changing models." |
|
|
|
try: |
|
if name == "flan-t5-base" and not token: |
|
return "β οΈ Please enter your Hugging Face token to use flan-t5-base. Get one at https://huggingface.co/settings/tokens" |
|
|
|
|
|
hf_token = token if name == "flan-t5-base" else None |
|
qa_chain = build_chain(store, name, hf_token) |
|
model_name = name |
|
return f"β
Switched to {label}" |
|
except Exception as e: |
|
return f"β Failed to switch model: {str(e)}" |
|
model_dd.change( |
|
fn=_on_model_change, |
|
inputs=[model_dd, hf_token], |
|
outputs=status |
|
) |
|
|
|
uploader.change( |
|
fn=upload_media, |
|
inputs=uploader, |
|
outputs=status, |
|
api_name="upload_media" |
|
) |
|
inp.submit( |
|
fn=handle, |
|
inputs=inp, |
|
outputs=[out_audio, ts_label, progress], |
|
show_progress=False |
|
) |
|
|
|
with gr.Tab("Debug Log"): |
|
log_box = gr.Textbox(label="Application Logs", lines=25, max_lines=25, interactive=False) |
|
refresh_btn = gr.Button("Refresh Logs") |
|
|
|
def refresh_logs(): |
|
from logging_config import get_logs |
|
logs = get_logs() |
|
return f""" |
|
===== LATEST LOGS ===== |
|
{logs[-5000:] if len(logs) > 5000 else logs} |
|
====================== |
|
""" |
|
|
|
refresh_btn.click(refresh_logs, None, log_box) |
|
demo.load(refresh_logs, None, log_box, every=5) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|