Loren's picture
Upload app.py
8559f42 verified
import gradio as gr
import torch
from transformers import AutoProcessor, VoxtralForConditionalGeneration
from pydub import AudioSegment
from pydub.silence import detect_silence
import yt_dlp
import requests
import validators
from urllib.parse import urlparse
import subprocess
import os
import re
import glob
import spaces
### Initializations
MAX_TOKENS = 32000
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"*** Device: {device}")
model_name = 'mistralai/Voxtral-Mini-3B-2507'
processor = AutoProcessor.from_pretrained(model_name)
model = VoxtralForConditionalGeneration.from_pretrained(model_name,
torch_dtype=torch.bfloat16,
device_map=device)
# Supported languages
dict_languages = {"English": "en",
"French": "fr",
"German": "de",
"Spanish": "es",
"Italian": "it",
"Portuguese": "pt",
"Dutch": "nl",
"Hindi": "hi"}
# Whitelist of allowed MIME types for audio and video
ALLOWED_MIME_TYPES = {
# Audio
'audio/mpeg', 'audio/wav', 'audio/wave', 'audio/x-wav', 'audio/x-pn-wav',
'audio/ogg', 'audio/vorbis', 'audio/aac', 'audio/mp4', 'audio/flac',
'audio/x-flac', 'audio/opus', 'audio/webm',
# Video
'video/mp4', 'video/mpeg', 'video/ogg', 'video/webm', 'video/quicktime',
'video/x-msvideo', 'video/x-matroska'
}
# Maximum allowed file size (in bytes). Ex: 1 GB
MAX_FILE_SIZE = 1 * 1024 * 1024 * 1024 # 1 GB
# Directory where the files will be saved
DOWNLOAD_DIR = "downloaded_files"
if not os.path.exists(DOWNLOAD_DIR):
os.makedirs(DOWNLOAD_DIR)
MAX_LEN = 1800000 # 30 mn
one_second_silence = AudioSegment.silent(duration=1000)
#### Functions
@spaces.GPU
def chunks_creation(audio_path):
list_audio_path = [audio_path]
audio = AudioSegment.from_file(audio_path)
status = gr.Markdown("πŸ‘ Audio duration less than max")
# Input too large ?
if len(audio) > MAX_LEN:
list_audio_path = []
try:
# Create list of chunks
list_silent = detect_silence(audio,min_silence_len=300,
# silent if quieter than -14 dBFS threshold
silence_thresh=audio.dBFS-14, seek_step=100)
list_interval = [(start, stop) for start, stop in list_silent]
# Calculate speech intervals
list_speech = []
current_start = 0
for start, stop in list_interval:
if current_start < start:
list_interval.append((current_start, start))
current_start = stop
# Add last interval if needed
if current_start < len(audio):
list_speech.append((current_start, len(audio)))
# Determination of chunks, to fit within the maximum duration
list_chunks = []
deb_chunk, fin_chunk = 0, list_speech[0][1]
for start, end in list_speech[1:]:
if end - deb_chunk + one_second_silence <= MAX_LEN:
fin_chunk = end + one_second_silence
else:
list_chunks.append([deb_chunk, fin_chunk])
deb_chunk, fin_chunk = start, end
list_chunks.append([deb_chunk, fin_chunk+one_second_silence])
# Save chunks
for i, (start, stop) in enumerate(list_chunks):
segment = audio[start:stop]
segment.export(f"chunk_{i}.wav", format="wav")
list_audio_path.append(f"chunk_{i}.wav")
status = f"βœ… **Success!** {len(list_audio_path)} chunks saved."
except Exception as e:
status = gr.Markdown(f"❌ **Unexpected error during chuncks creation:** {e}")
return list_audio_path, status
###
@spaces.GPU
def process_transcript(language: str, audio_path: str) -> str:
"""Process the audio file to return its transcription.
Args:
language: The language of the audio.
audio_path: The path to the audio file.
Returns:
The transcribed text of the audio.
The status of transcription : with or without chunking.
"""
result = ""
status = gr.Markdown()
if audio_path is None:
status = gr.Markdown("Please provide some input audio: either upload an audio file or use the microphone.")
else:
id_language = dict_languages[language]
# Verification of the duration, for possible division into chunks
list_audio_path, status = chunks_creation(audio_path)
# Transcription process
try:
for path in list_audio_path:
inputs = processor.apply_transcrition_request(language=id_language,
audio=path, model_id=model_name)
inputs = inputs.to(device, dtype=torch.bfloat16)
outputs = model.generate(**inputs, max_new_tokens=MAX_TOKENS)
decoded_outputs = processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:],
skip_special_tokens=True)
result += decoded_outputs[0]
status = "βœ… **Success!** Transcription done."
except Exception as e:
status = gr.Markdown(f"❌ **Unexpected error during transcription:** {e}")
return result, status
###
@spaces.GPU
def process_translate(language: str, audio_path: str) -> str:
result = ""
status = gr.Markdown()
if audio_path is None:
status = gr.Markdown("Please provide some input audio: either upload an audio file or use the microphone.")
else:
try:
conversation = [
{
"role": "user",
"content": [
{
"type": "audio",
"path": audio_path,
},
{"type": "text", "text": "Translate this in "+language},
],
}
]
inputs = processor.apply_chat_template(conversation)
inputs = inputs.to(device, dtype=torch.bfloat16)
outputs = model.generate(**inputs, max_new_tokens=MAX_TOKENS)
decoded_outputs = processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:], skip_special_tokens=True)
result = decoded_outputs[0]
status = "βœ… **Success!** Translation done."
except Exception as e:
status = gr.Markdown(f"❌ **Unexpected error during translation:** {e}")
return result, status
###
@spaces.GPU
def process_chat(question: str, audio_path: str) -> str:
result = ""
status = gr.Markdown()
if audio_path is None:
status = gr.Markdown("Please provide some input audio: either upload an audio file or use the microphone.")
else:
try:
conversation = [
{
"role": "user",
"content": [
{
"type": "audio",
"path": audio_path,
},
{"type": "text", "text": question},
],
}
]
inputs = processor.apply_chat_template(conversation)
inputs = inputs.to(device, dtype=torch.bfloat16)
outputs = model.generate(**inputs, max_new_tokens=500)
decoded_outputs = processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:], skip_special_tokens=True)
result = decoded_outputs[0]
status = "βœ… **Success!** Translation done."
except Exception as e:
status = gr.Markdown(f"❌ **Unexpected error during translation:** {e}")
return result, status
###
def disable_buttons():
return gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False)
def enable_buttons():
return gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True)
###
def clear_audio():
return None, None, None, None
###
@spaces.GPU
def voice_extract_demucs():
"""
Returns the path of the voice extracted file.
"""
try:
cmd = [
"demucs",
"--two-stems=vocals",
"--out", "demucs",
"audio_file.wav"
]
subprocess.run(cmd, check=True)
voice_path = os.path.join("demucs", "htdemucs", "audio_file", "vocals.wav")
success_message = "βœ… **Success!** Voice extracted."
return voice_path, voice_path, gr.Markdown(success_message)
except Exception as e:
return None, None, gr.Markdown(f"❌ **Error:** An unexpected ERROR occurred: {e}")
###
def secure_download_from_url(url: str):
"""
Validates a URL and downloads the file if it is an authorized media.
Returns the path of the downloaded file or an error message.
"""
# Step 1: Validate the URL format
if not validators.url(url):
return None, None, gr.Markdown("❌ **Error:** The provided URL is invalid.")
try:
# Step 2: Send a HEAD request to check the headers without downloading the content
# allow_redirects=True to follow redirects to the final file location.
# timeout to avoid blocking requests.
response = requests.head(url, allow_redirects=True, timeout=10)
# Check if the request was successful (status code 2xx)
response.raise_for_status()
# Step 3: Validate the content type (MIME type)
content_type = response.headers.get('Content-Type', '').split(';')[0].strip()
if content_type not in ALLOWED_MIME_TYPES:
error_message = (
f"❌ **Error:** The file type is not allowed.\n"
f" - **Type detected:** `{content_type}`\n"
f" - **Allowed types:** Audio and Video only."
)
return None, None, gr.Markdown(error_message)
# Step 4: Validate the file size
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > MAX_FILE_SIZE:
error_message = (
f"❌ **Error:** The file is too large.\n"
f" - **File size:** {int(content_length) / 1024 / 1024:.2f} MB\n"
f" - **Maximum allowed size:** {MAX_FILE_SIZE / 1024 / 1024:.2f} MB"
)
return None, None, gr.Markdown(error_message)
# Step 5: Secure streaming download
with requests.get(url, stream=True, timeout=20) as r:
r.raise_for_status()
# Extract the file name from the URL
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
if not filename: # Si l'URL se termine par un '/'
filename = "downloaded_media_file"
filepath = os.path.join(DOWNLOAD_DIR, filename)
# --- Step 6: Download the audio ---
# Write the file in chunks to avoid overloading memory
with open(filepath, 'wb') as f:
downloaded_size = 0
for chunk in r.iter_content(chunk_size=8192):
downloaded_size += len(chunk)
if downloaded_size > MAX_FILE_SIZE:
os.remove(filepath) # Supprimer le fichier partiel
return None, None, gr.Markdown("❌ **Error:** The file exceeds the maximum allowed size during download.")
f.write(chunk)
# --- Step 7: Convert to WAV using Pydub ---
audio_file = AudioSegment.from_file(filepath)
file_handle = audio_file.export("audio_file.wav", format="wav")
# --- Step 8: Clean up ---
try:
files = glob.glob(DOWNLOAD_DIR)
for f in files:
os.remove(f)
except:
pass
success_message = (
f"βœ… **Success!** File downloaded and saved."
)
# Returns the file path and a success message.
return "audio_file.wav", "audio_file.wav", gr.Markdown(success_message)
except requests.exceptions.RequestException as e:
# Handle network errors (timeout, DNS, connection refused, etc.)
return None, None, gr.Markdown(f"❌ **Network error:** Unable to reach URL. Details: {e}")
except Exception as e:
# Handle Other potential errors
return None, None, gr.Markdown(f"❌ **Unexpected error:** {e}")
###
def secure_download_youtube_audio(url: str):
"""
Returns the path of the downloaded file or an error message.
"""
# --- Step 1: Validate URL format with Regex ---
youtube_regex = re.compile(
r'^(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/'
r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})')
if not youtube_regex.match(url):
return None, None, gr.Markdown("❌ **Error:** The URL '{url}' does not appear to be a valid YouTube URL.")
try:
# --- Step 2: Check video availability ---
ydl_info_opts = {'quiet': True, 'skip_download': True}
try:
with yt_dlp.YoutubeDL(ydl_info_opts) as ydl:
info = ydl.extract_info(url, download=False)
except yt_dlp.utils.DownloadError as e:
return None, None, gr.Markdown(f"❌ **Error:** The video at URL '{url}' is unavailable ({str(e)})")
# --- Step 3: Select best audio format ---
formats = [f for f in info['formats'] if f.get('acodec') != 'none']
if not formats:
return None, None, gr.Markdown("❌ **Error:** No audio-only stream was found for this video.")
formats.sort(key=lambda f: f.get('abr') or 0, reverse=True)
best_audio_format = formats[0]
# --- Step 4: Check file size BEFORE downloading ---
filesize = best_audio_format.get('filesize') or best_audio_format.get('filesize_approx')
if filesize is None:
print("Could not determine file size before downloading.")
filesize = 1
if filesize > MAX_FILE_SIZE:
return None, None, gr.Markdown(
f"❌ **Error:** The file is too large.\n"
f" - **File size:** {filesize / 1024 / 1024:.2f} MB\n"
f" - **Maximum allowed size:** {MAX_FILE_SIZE / 1024 / 1024:.2f} MB"
)
# --- Step 5: Download & convert directly to WAV ---
ydl_opts = {
'quiet': True,
'format': f"{best_audio_format['format_id']}",
'outtmpl': "audio_file", # will be replaced by ffmpeg output
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '192',
}],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
success_message = "βœ… **Success!** Audio extracted and saved."
return "audio_file.wav", "audio_file.wav", gr.Markdown(success_message)
except FileNotFoundError:
return None, None, gr.Markdown("❌ **Error:** FFmpeg not found. Please ensure it is installed and in your system's PATH.")
except Exception as e:
return None, None, gr.Markdown(f"❌ **Error:** An unexpected ERROR occurred: {e}")
###
def voice_extract_demucs():
"""
Returns the path of the voice extracted file.
"""
try:
cmd = [
"demucs",
"--two-stems=vocals",
"--out", "demucs",
"audio_file.wav"
]
subprocess.run(cmd, check=True)
voice_path = os.path.join("demucs", "htdemucs", "audio_file", "vocals.wav")
success_message = "βœ… **Success!** Voice extracted."
return voice_path, voice_path, gr.Markdown(success_message)
except Exception as e:
return None, None, gr.Markdown(f"❌ **Error:** An unexpected ERROR occurred: {e}")
###
def clear_audio():
return None, None, None, None
###
#### Gradio interface
with gr.Blocks(title="Voxtral") as voxtral:
with gr.Row():
gr.Markdown("# **Voxtral Mini Evaluation**")
with gr.Accordion("πŸ”Ž More on Voxtral", open=False):
gr.Markdown("""## **Key Features:**
#### Voxtral builds upon Ministral-3B with powerful audio understanding capabilities.
##### - **Dedicated transcription mode**: Voxtral can operate in a pure speech transcription mode to maximize performance. By default, Voxtral automatically predicts the source audio language and transcribes the text accordingly
##### - **Long-form context**: With a 32k token context length, Voxtral handles audios up to 30 minutes for transcription, or 40 minutes for understanding
##### - **Built-in Q&A and summarization**: Supports asking questions directly through audio. Analyze audio and generate structured summaries without the need for separate ASR and language models
##### - **Natively multilingual**: Automatic language detection and state-of-the-art performance in the world’s most widely used languages (English, Spanish, French, Portuguese, Hindi, German, Dutch, Italian)
##### - **Function-calling straight from voice**: Enables direct triggering of backend functions, workflows, or API calls based on spoken user intents
##### - **Highly capable at text**: Retains the text understanding capabilities of its language model backbone, Ministral-3B""")
gr.Markdown("""#### Voxtral Mini is an enhancement of **Ministral 3B**, incorporating state-of-the-art audio input \
capabilities while retaining best-in-class text performance. It excels at speech transcription, translation and \
audio understanding. Available languages: English, Spanish, French, Portuguese, Hindi, German, Dutch, Italian.""")
gr.Markdown("### **1.Choose the audio:**")
sel_audio = gr.State()
with gr.Row():
with gr.Tabs():
with gr.Tab("From record or file upload"):
gr.Markdown("### **Upload an audio file, record via microphone, or select a demo file:**")
gr.Markdown("### *(Voxtral handles audios up to 30 minutes for transcription; if longer, it will be cut into chunks)*")
sel_audio1 = gr.Audio(sources=["upload", "microphone"], type="filepath",
label="Set an audio file to process it:")
example1 = [["mapo_tofu.mp3"]]
gr.Examples(
examples=example1,
inputs=sel_audio1,
outputs=None,
fn=None,
cache_examples=False,
run_on_click=False
)
status_output1 = gr.Markdown()
with gr.Row():
voice_button1 = gr.Button("Extract voice (if noisy environment)")
voice_button1.click(
fn=voice_extract_demucs,
outputs=[sel_audio, sel_audio1, status_output1])
clear_audio1 = gr.Button("Clear audio")
clear_audio1.click(
fn=clear_audio,
outputs=[sel_audio, sel_audio, sel_audio1, status_output1])
with gr.Tab("From file url (audio or video file)"):
gr.Markdown("### **Enter the url of the file (mp3, wav, mp4, ...):**")
url_input2 = gr.Textbox(label="URL (MP3 or MP4 file)",
placeholder="https://huggingface.co/datasets/merve/vlm_test_images/resolve/main/mapo_tofu.mp4")
example2 = [["https://huggingface.co/datasets/merve/vlm_test_images/resolve/main/mapo_tofu.mp4"]]
gr.Examples(
examples=example2,
inputs=url_input2,
outputs=None,
fn=None,
cache_examples=False,
run_on_click=False
)
download_button2 = gr.Button("Check and upload", variant="primary")
input_audio2 = gr.Audio()
status_output2 = gr.Markdown()
download_button2.click(
fn=secure_download_from_url,
inputs=url_input2,
outputs=[input_audio2, sel_audio, status_output2]
)
with gr.Row():
voice_button2 = gr.Button("Extract voice (if noisy environment)")
voice_button2.click(
fn=voice_extract_demucs,
outputs=[input_audio2, sel_audio, status_output2])
clear_audio1 = gr.Button("Clear audio")
clear_audio1.click(
fn=clear_audio,
outputs=[sel_audio, url_input2, input_audio2, status_output2])
with gr.Tab("From Youtube url:"):
gr.Markdown("### **Enter the url of the Youtube video:**")
url_input3 = gr.Textbox(label="Youtube url",
placeholder="https://www.youtube.com/...")
download_button3 = gr.Button("Check and upload", variant="primary")
input_audio3 = gr.Audio()
status_output3 = gr.Markdown()
download_button3.click(
fn=secure_download_youtube_audio,
inputs=url_input3,
outputs=[input_audio3, sel_audio, status_output3]
)
with gr.Row():
voice_button3 = gr.Button("Extract voice (if noisy environment)")
voice_button3.click(
fn=voice_extract_demucs,
outputs=[input_audio3, sel_audio, status_output3])
clear_audio1 = gr.Button("Clear audio")
clear_audio1.click(
fn=clear_audio,
outputs=[sel_audio, url_input3, input_audio3, status_output3])
with gr.Row():
gr.Markdown("### **2. Choose one of theese tasks:**")
with gr.Row():
with gr.Column():
with gr.Accordion("πŸ“ Transcription", open=True):
sel_language = gr.Dropdown(
choices=list(dict_languages.keys()),
value="English",
label="Select the language of the audio file:"
)
submit_transcript = gr.Button("Extract transcription", variant="primary")
text_transcript = gr.Textbox(label="πŸ’¬ Generated transcription", lines=10)
status_transcript = gr.Markdown()
with gr.Column():
with gr.Accordion("πŸ” Translation", open=True):
list_language = list(dict_languages.keys())
list_language.pop(list_language.index(sel_language.value)) # Fix: Access the value of the dropdown
sel_translate_language = gr.Dropdown(
choices=list(dict_languages.keys()),
value="English",
label="Select the language for translation:"
)
submit_translate = gr.Button("Translate audio file", variant="primary")
text_translate = gr.Textbox(label="πŸ’¬ Generated translation", lines=10)
status_translate = gr.Markdown()
with gr.Column():
with gr.Accordion("πŸ€– Ask audio file", open=True):
question_chat = gr.Textbox(label="Enter your question about audio file:", placeholder="Enter your question about audio file")
submit_chat = gr.Button("Ask audio file", variant="primary")
example_chat = [["What is the subject of this audio file?"], ["Quels sont les ingrΓ©dients ?"]]
gr.Examples(
examples=example_chat,
inputs=question_chat,
outputs=None,
fn=None,
cache_examples=False,
run_on_click=False
)
text_chat = gr.Textbox(label="πŸ’¬ Model answer", lines=10)
status_chat = gr.Markdown()
### Processing
# Transcription
submit_transcript.click(
disable_buttons,
outputs=[submit_transcript, submit_translate, submit_chat],
trigger_mode="once",
).then(
fn=process_transcript,
inputs=[sel_language, sel_audio],
outputs=[text_transcript, status_transcript]
).then(
enable_buttons,
outputs=[submit_transcript, submit_translate, submit_chat],
)
# Translation
submit_translate.click(
disable_buttons,
outputs=[submit_transcript, submit_translate, submit_chat],
trigger_mode="once",
).then(
fn=process_translate,
inputs=[sel_translate_language, sel_audio],
outputs=[text_translate, status_translate]
).then(
enable_buttons,
outputs=[submit_transcript, submit_translate, submit_chat],
)
# Chat
submit_chat.click(
disable_buttons,
outputs=[submit_transcript, submit_translate, submit_chat],
trigger_mode="once",
).then(
fn=process_chat,
inputs=[question_chat, sel_audio],
outputs=[text_chat, status_chat]
).then(
enable_buttons,
outputs=[submit_transcript, submit_translate, submit_chat],
)
### Launch the app
if __name__ == "__main__":
voxtral.queue().launch(debug=True)