Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
from transformers import AutoProcessor, VoxtralForConditionalGeneration | |
from pydub import AudioSegment | |
from pydub.silence import detect_silence | |
import yt_dlp | |
import requests | |
import validators | |
from urllib.parse import urlparse | |
import subprocess | |
import os | |
import re | |
import glob | |
import spaces | |
### Initializations | |
MAX_TOKENS = 32000 | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"*** Device: {device}") | |
model_name = 'mistralai/Voxtral-Mini-3B-2507' | |
processor = AutoProcessor.from_pretrained(model_name) | |
model = VoxtralForConditionalGeneration.from_pretrained(model_name, | |
torch_dtype=torch.bfloat16, | |
device_map=device) | |
# Supported languages | |
dict_languages = {"English": "en", | |
"French": "fr", | |
"German": "de", | |
"Spanish": "es", | |
"Italian": "it", | |
"Portuguese": "pt", | |
"Dutch": "nl", | |
"Hindi": "hi"} | |
# Whitelist of allowed MIME types for audio and video | |
ALLOWED_MIME_TYPES = { | |
# Audio | |
'audio/mpeg', 'audio/wav', 'audio/wave', 'audio/x-wav', 'audio/x-pn-wav', | |
'audio/ogg', 'audio/vorbis', 'audio/aac', 'audio/mp4', 'audio/flac', | |
'audio/x-flac', 'audio/opus', 'audio/webm', | |
# Video | |
'video/mp4', 'video/mpeg', 'video/ogg', 'video/webm', 'video/quicktime', | |
'video/x-msvideo', 'video/x-matroska' | |
} | |
# Maximum allowed file size (in bytes). Ex: 1 GB | |
MAX_FILE_SIZE = 1 * 1024 * 1024 * 1024 # 1 GB | |
# Directory where the files will be saved | |
DOWNLOAD_DIR = "downloaded_files" | |
if not os.path.exists(DOWNLOAD_DIR): | |
os.makedirs(DOWNLOAD_DIR) | |
MAX_LEN = 1800000 # 30 mn | |
one_second_silence = AudioSegment.silent(duration=1000) | |
#### Functions | |
def chunks_creation(audio_path): | |
list_audio_path = [audio_path] | |
audio = AudioSegment.from_file(audio_path) | |
status = gr.Markdown("π Audio duration less than max") | |
# Input too large ? | |
if len(audio) > MAX_LEN: | |
list_audio_path = [] | |
try: | |
# Create list of chunks | |
list_silent = detect_silence(audio,min_silence_len=300, | |
# silent if quieter than -14 dBFS threshold | |
silence_thresh=audio.dBFS-14, seek_step=100) | |
list_interval = [(start, stop) for start, stop in list_silent] | |
# Calculate speech intervals | |
list_speech = [] | |
current_start = 0 | |
for start, stop in list_interval: | |
if current_start < start: | |
list_interval.append((current_start, start)) | |
current_start = stop | |
# Add last interval if needed | |
if current_start < len(audio): | |
list_speech.append((current_start, len(audio))) | |
# Determination of chunks, to fit within the maximum duration | |
list_chunks = [] | |
deb_chunk, fin_chunk = 0, list_speech[0][1] | |
for start, end in list_speech[1:]: | |
if end - deb_chunk + one_second_silence <= MAX_LEN: | |
fin_chunk = end + one_second_silence | |
else: | |
list_chunks.append([deb_chunk, fin_chunk]) | |
deb_chunk, fin_chunk = start, end | |
list_chunks.append([deb_chunk, fin_chunk+one_second_silence]) | |
# Save chunks | |
for i, (start, stop) in enumerate(list_chunks): | |
segment = audio[start:stop] | |
segment.export(f"chunk_{i}.wav", format="wav") | |
list_audio_path.append(f"chunk_{i}.wav") | |
status = f"β **Success!** {len(list_audio_path)} chunks saved." | |
except Exception as e: | |
status = gr.Markdown(f"β **Unexpected error during chuncks creation:** {e}") | |
return list_audio_path, status | |
### | |
def process_transcript(language: str, audio_path: str) -> str: | |
"""Process the audio file to return its transcription. | |
Args: | |
language: The language of the audio. | |
audio_path: The path to the audio file. | |
Returns: | |
The transcribed text of the audio. | |
The status of transcription : with or without chunking. | |
""" | |
result = "" | |
status = gr.Markdown() | |
if audio_path is None: | |
status = gr.Markdown("Please provide some input audio: either upload an audio file or use the microphone.") | |
else: | |
id_language = dict_languages[language] | |
# Verification of the duration, for possible division into chunks | |
list_audio_path, status = chunks_creation(audio_path) | |
# Transcription process | |
try: | |
for path in list_audio_path: | |
inputs = processor.apply_transcrition_request(language=id_language, | |
audio=path, model_id=model_name) | |
inputs = inputs.to(device, dtype=torch.bfloat16) | |
outputs = model.generate(**inputs, max_new_tokens=MAX_TOKENS) | |
decoded_outputs = processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:], | |
skip_special_tokens=True) | |
result += decoded_outputs[0] | |
status = "β **Success!** Transcription done." | |
except Exception as e: | |
status = gr.Markdown(f"β **Unexpected error during transcription:** {e}") | |
return result, status | |
### | |
def process_translate(language: str, audio_path: str) -> str: | |
result = "" | |
status = gr.Markdown() | |
if audio_path is None: | |
status = gr.Markdown("Please provide some input audio: either upload an audio file or use the microphone.") | |
else: | |
try: | |
conversation = [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "audio", | |
"path": audio_path, | |
}, | |
{"type": "text", "text": "Translate this in "+language}, | |
], | |
} | |
] | |
inputs = processor.apply_chat_template(conversation) | |
inputs = inputs.to(device, dtype=torch.bfloat16) | |
outputs = model.generate(**inputs, max_new_tokens=MAX_TOKENS) | |
decoded_outputs = processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:], skip_special_tokens=True) | |
result = decoded_outputs[0] | |
status = "β **Success!** Translation done." | |
except Exception as e: | |
status = gr.Markdown(f"β **Unexpected error during translation:** {e}") | |
return result, status | |
### | |
def process_chat(question: str, audio_path: str) -> str: | |
result = "" | |
status = gr.Markdown() | |
if audio_path is None: | |
status = gr.Markdown("Please provide some input audio: either upload an audio file or use the microphone.") | |
else: | |
try: | |
conversation = [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "audio", | |
"path": audio_path, | |
}, | |
{"type": "text", "text": question}, | |
], | |
} | |
] | |
inputs = processor.apply_chat_template(conversation) | |
inputs = inputs.to(device, dtype=torch.bfloat16) | |
outputs = model.generate(**inputs, max_new_tokens=500) | |
decoded_outputs = processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:], skip_special_tokens=True) | |
result = decoded_outputs[0] | |
status = "β **Success!** Translation done." | |
except Exception as e: | |
status = gr.Markdown(f"β **Unexpected error during translation:** {e}") | |
return result, status | |
### | |
def disable_buttons(): | |
return gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False) | |
def enable_buttons(): | |
return gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True) | |
### | |
def clear_audio(): | |
return None, None, None, None | |
### | |
def voice_extract_demucs(): | |
""" | |
Returns the path of the voice extracted file. | |
""" | |
try: | |
cmd = [ | |
"demucs", | |
"--two-stems=vocals", | |
"--out", "demucs", | |
"audio_file.wav" | |
] | |
subprocess.run(cmd, check=True) | |
voice_path = os.path.join("demucs", "htdemucs", "audio_file", "vocals.wav") | |
success_message = "β **Success!** Voice extracted." | |
return voice_path, voice_path, gr.Markdown(success_message) | |
except Exception as e: | |
return None, None, gr.Markdown(f"β **Error:** An unexpected ERROR occurred: {e}") | |
### | |
def secure_download_from_url(url: str): | |
""" | |
Validates a URL and downloads the file if it is an authorized media. | |
Returns the path of the downloaded file or an error message. | |
""" | |
# Step 1: Validate the URL format | |
if not validators.url(url): | |
return None, None, gr.Markdown("β **Error:** The provided URL is invalid.") | |
try: | |
# Step 2: Send a HEAD request to check the headers without downloading the content | |
# allow_redirects=True to follow redirects to the final file location. | |
# timeout to avoid blocking requests. | |
response = requests.head(url, allow_redirects=True, timeout=10) | |
# Check if the request was successful (status code 2xx) | |
response.raise_for_status() | |
# Step 3: Validate the content type (MIME type) | |
content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
if content_type not in ALLOWED_MIME_TYPES: | |
error_message = ( | |
f"β **Error:** The file type is not allowed.\n" | |
f" - **Type detected:** `{content_type}`\n" | |
f" - **Allowed types:** Audio and Video only." | |
) | |
return None, None, gr.Markdown(error_message) | |
# Step 4: Validate the file size | |
content_length = response.headers.get('Content-Length') | |
if content_length and int(content_length) > MAX_FILE_SIZE: | |
error_message = ( | |
f"β **Error:** The file is too large.\n" | |
f" - **File size:** {int(content_length) / 1024 / 1024:.2f} MB\n" | |
f" - **Maximum allowed size:** {MAX_FILE_SIZE / 1024 / 1024:.2f} MB" | |
) | |
return None, None, gr.Markdown(error_message) | |
# Step 5: Secure streaming download | |
with requests.get(url, stream=True, timeout=20) as r: | |
r.raise_for_status() | |
# Extract the file name from the URL | |
parsed_url = urlparse(url) | |
filename = os.path.basename(parsed_url.path) | |
if not filename: # Si l'URL se termine par un '/' | |
filename = "downloaded_media_file" | |
filepath = os.path.join(DOWNLOAD_DIR, filename) | |
# --- Step 6: Download the audio --- | |
# Write the file in chunks to avoid overloading memory | |
with open(filepath, 'wb') as f: | |
downloaded_size = 0 | |
for chunk in r.iter_content(chunk_size=8192): | |
downloaded_size += len(chunk) | |
if downloaded_size > MAX_FILE_SIZE: | |
os.remove(filepath) # Supprimer le fichier partiel | |
return None, None, gr.Markdown("β **Error:** The file exceeds the maximum allowed size during download.") | |
f.write(chunk) | |
# --- Step 7: Convert to WAV using Pydub --- | |
audio_file = AudioSegment.from_file(filepath) | |
file_handle = audio_file.export("audio_file.wav", format="wav") | |
# --- Step 8: Clean up --- | |
try: | |
files = glob.glob(DOWNLOAD_DIR) | |
for f in files: | |
os.remove(f) | |
except: | |
pass | |
success_message = ( | |
f"β **Success!** File downloaded and saved." | |
) | |
# Returns the file path and a success message. | |
return "audio_file.wav", "audio_file.wav", gr.Markdown(success_message) | |
except requests.exceptions.RequestException as e: | |
# Handle network errors (timeout, DNS, connection refused, etc.) | |
return None, None, gr.Markdown(f"β **Network error:** Unable to reach URL. Details: {e}") | |
except Exception as e: | |
# Handle Other potential errors | |
return None, None, gr.Markdown(f"β **Unexpected error:** {e}") | |
### | |
def secure_download_youtube_audio(url: str): | |
""" | |
Returns the path of the downloaded file or an error message. | |
""" | |
# --- Step 1: Validate URL format with Regex --- | |
youtube_regex = re.compile( | |
r'^(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/' | |
r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})') | |
if not youtube_regex.match(url): | |
return None, None, gr.Markdown("β **Error:** The URL '{url}' does not appear to be a valid YouTube URL.") | |
try: | |
# --- Step 2: Check video availability --- | |
ydl_info_opts = {'quiet': True, 'skip_download': True} | |
try: | |
with yt_dlp.YoutubeDL(ydl_info_opts) as ydl: | |
info = ydl.extract_info(url, download=False) | |
except yt_dlp.utils.DownloadError as e: | |
return None, None, gr.Markdown(f"β **Error:** The video at URL '{url}' is unavailable ({str(e)})") | |
# --- Step 3: Select best audio format --- | |
formats = [f for f in info['formats'] if f.get('acodec') != 'none'] | |
if not formats: | |
return None, None, gr.Markdown("β **Error:** No audio-only stream was found for this video.") | |
formats.sort(key=lambda f: f.get('abr') or 0, reverse=True) | |
best_audio_format = formats[0] | |
# --- Step 4: Check file size BEFORE downloading --- | |
filesize = best_audio_format.get('filesize') or best_audio_format.get('filesize_approx') | |
if filesize is None: | |
print("Could not determine file size before downloading.") | |
filesize = 1 | |
if filesize > MAX_FILE_SIZE: | |
return None, None, gr.Markdown( | |
f"β **Error:** The file is too large.\n" | |
f" - **File size:** {filesize / 1024 / 1024:.2f} MB\n" | |
f" - **Maximum allowed size:** {MAX_FILE_SIZE / 1024 / 1024:.2f} MB" | |
) | |
# --- Step 5: Download & convert directly to WAV --- | |
ydl_opts = { | |
'quiet': True, | |
'format': f"{best_audio_format['format_id']}", | |
'outtmpl': "audio_file", # will be replaced by ffmpeg output | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'wav', | |
'preferredquality': '192', | |
}], | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
success_message = "β **Success!** Audio extracted and saved." | |
return "audio_file.wav", "audio_file.wav", gr.Markdown(success_message) | |
except FileNotFoundError: | |
return None, None, gr.Markdown("β **Error:** FFmpeg not found. Please ensure it is installed and in your system's PATH.") | |
except Exception as e: | |
return None, None, gr.Markdown(f"β **Error:** An unexpected ERROR occurred: {e}") | |
### | |
def voice_extract_demucs(): | |
""" | |
Returns the path of the voice extracted file. | |
""" | |
try: | |
cmd = [ | |
"demucs", | |
"--two-stems=vocals", | |
"--out", "demucs", | |
"audio_file.wav" | |
] | |
subprocess.run(cmd, check=True) | |
voice_path = os.path.join("demucs", "htdemucs", "audio_file", "vocals.wav") | |
success_message = "β **Success!** Voice extracted." | |
return voice_path, voice_path, gr.Markdown(success_message) | |
except Exception as e: | |
return None, None, gr.Markdown(f"β **Error:** An unexpected ERROR occurred: {e}") | |
### | |
def clear_audio(): | |
return None, None, None, None | |
### | |
#### Gradio interface | |
with gr.Blocks(title="Voxtral") as voxtral: | |
with gr.Row(): | |
gr.Markdown("# **Voxtral Mini Evaluation**") | |
with gr.Accordion("π More on Voxtral", open=False): | |
gr.Markdown("""## **Key Features:** | |
#### Voxtral builds upon Ministral-3B with powerful audio understanding capabilities. | |
##### - **Dedicated transcription mode**: Voxtral can operate in a pure speech transcription mode to maximize performance. By default, Voxtral automatically predicts the source audio language and transcribes the text accordingly | |
##### - **Long-form context**: With a 32k token context length, Voxtral handles audios up to 30 minutes for transcription, or 40 minutes for understanding | |
##### - **Built-in Q&A and summarization**: Supports asking questions directly through audio. Analyze audio and generate structured summaries without the need for separate ASR and language models | |
##### - **Natively multilingual**: Automatic language detection and state-of-the-art performance in the worldβs most widely used languages (English, Spanish, French, Portuguese, Hindi, German, Dutch, Italian) | |
##### - **Function-calling straight from voice**: Enables direct triggering of backend functions, workflows, or API calls based on spoken user intents | |
##### - **Highly capable at text**: Retains the text understanding capabilities of its language model backbone, Ministral-3B""") | |
gr.Markdown("""#### Voxtral Mini is an enhancement of **Ministral 3B**, incorporating state-of-the-art audio input \ | |
capabilities while retaining best-in-class text performance. It excels at speech transcription, translation and \ | |
audio understanding. Available languages: English, Spanish, French, Portuguese, Hindi, German, Dutch, Italian.""") | |
gr.Markdown("### **1.Choose the audio:**") | |
sel_audio = gr.State() | |
with gr.Row(): | |
with gr.Tabs(): | |
with gr.Tab("From record or file upload"): | |
gr.Markdown("### **Upload an audio file, record via microphone, or select a demo file:**") | |
gr.Markdown("### *(Voxtral handles audios up to 30 minutes for transcription; if longer, it will be cut into chunks)*") | |
sel_audio1 = gr.Audio(sources=["upload", "microphone"], type="filepath", | |
label="Set an audio file to process it:") | |
example1 = [["mapo_tofu.mp3"]] | |
gr.Examples( | |
examples=example1, | |
inputs=sel_audio1, | |
outputs=None, | |
fn=None, | |
cache_examples=False, | |
run_on_click=False | |
) | |
status_output1 = gr.Markdown() | |
with gr.Row(): | |
voice_button1 = gr.Button("Extract voice (if noisy environment)") | |
voice_button1.click( | |
fn=voice_extract_demucs, | |
outputs=[sel_audio, sel_audio1, status_output1]) | |
clear_audio1 = gr.Button("Clear audio") | |
clear_audio1.click( | |
fn=clear_audio, | |
outputs=[sel_audio, sel_audio, sel_audio1, status_output1]) | |
with gr.Tab("From file url (audio or video file)"): | |
gr.Markdown("### **Enter the url of the file (mp3, wav, mp4, ...):**") | |
url_input2 = gr.Textbox(label="URL (MP3 or MP4 file)", | |
placeholder="https://huggingface.co/datasets/merve/vlm_test_images/resolve/main/mapo_tofu.mp4") | |
example2 = [["https://huggingface.co/datasets/merve/vlm_test_images/resolve/main/mapo_tofu.mp4"]] | |
gr.Examples( | |
examples=example2, | |
inputs=url_input2, | |
outputs=None, | |
fn=None, | |
cache_examples=False, | |
run_on_click=False | |
) | |
download_button2 = gr.Button("Check and upload", variant="primary") | |
input_audio2 = gr.Audio() | |
status_output2 = gr.Markdown() | |
download_button2.click( | |
fn=secure_download_from_url, | |
inputs=url_input2, | |
outputs=[input_audio2, sel_audio, status_output2] | |
) | |
with gr.Row(): | |
voice_button2 = gr.Button("Extract voice (if noisy environment)") | |
voice_button2.click( | |
fn=voice_extract_demucs, | |
outputs=[input_audio2, sel_audio, status_output2]) | |
clear_audio1 = gr.Button("Clear audio") | |
clear_audio1.click( | |
fn=clear_audio, | |
outputs=[sel_audio, url_input2, input_audio2, status_output2]) | |
with gr.Tab("From Youtube url:"): | |
gr.Markdown("### **Enter the url of the Youtube video:**") | |
url_input3 = gr.Textbox(label="Youtube url", | |
placeholder="https://www.youtube.com/...") | |
download_button3 = gr.Button("Check and upload", variant="primary") | |
input_audio3 = gr.Audio() | |
status_output3 = gr.Markdown() | |
download_button3.click( | |
fn=secure_download_youtube_audio, | |
inputs=url_input3, | |
outputs=[input_audio3, sel_audio, status_output3] | |
) | |
with gr.Row(): | |
voice_button3 = gr.Button("Extract voice (if noisy environment)") | |
voice_button3.click( | |
fn=voice_extract_demucs, | |
outputs=[input_audio3, sel_audio, status_output3]) | |
clear_audio1 = gr.Button("Clear audio") | |
clear_audio1.click( | |
fn=clear_audio, | |
outputs=[sel_audio, url_input3, input_audio3, status_output3]) | |
with gr.Row(): | |
gr.Markdown("### **2. Choose one of theese tasks:**") | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Accordion("π Transcription", open=True): | |
sel_language = gr.Dropdown( | |
choices=list(dict_languages.keys()), | |
value="English", | |
label="Select the language of the audio file:" | |
) | |
submit_transcript = gr.Button("Extract transcription", variant="primary") | |
text_transcript = gr.Textbox(label="π¬ Generated transcription", lines=10) | |
status_transcript = gr.Markdown() | |
with gr.Column(): | |
with gr.Accordion("π Translation", open=True): | |
list_language = list(dict_languages.keys()) | |
list_language.pop(list_language.index(sel_language.value)) # Fix: Access the value of the dropdown | |
sel_translate_language = gr.Dropdown( | |
choices=list(dict_languages.keys()), | |
value="English", | |
label="Select the language for translation:" | |
) | |
submit_translate = gr.Button("Translate audio file", variant="primary") | |
text_translate = gr.Textbox(label="π¬ Generated translation", lines=10) | |
status_translate = gr.Markdown() | |
with gr.Column(): | |
with gr.Accordion("π€ Ask audio file", open=True): | |
question_chat = gr.Textbox(label="Enter your question about audio file:", placeholder="Enter your question about audio file") | |
submit_chat = gr.Button("Ask audio file", variant="primary") | |
example_chat = [["What is the subject of this audio file?"], ["Quels sont les ingrΓ©dients ?"]] | |
gr.Examples( | |
examples=example_chat, | |
inputs=question_chat, | |
outputs=None, | |
fn=None, | |
cache_examples=False, | |
run_on_click=False | |
) | |
text_chat = gr.Textbox(label="π¬ Model answer", lines=10) | |
status_chat = gr.Markdown() | |
### Processing | |
# Transcription | |
submit_transcript.click( | |
disable_buttons, | |
outputs=[submit_transcript, submit_translate, submit_chat], | |
trigger_mode="once", | |
).then( | |
fn=process_transcript, | |
inputs=[sel_language, sel_audio], | |
outputs=[text_transcript, status_transcript] | |
).then( | |
enable_buttons, | |
outputs=[submit_transcript, submit_translate, submit_chat], | |
) | |
# Translation | |
submit_translate.click( | |
disable_buttons, | |
outputs=[submit_transcript, submit_translate, submit_chat], | |
trigger_mode="once", | |
).then( | |
fn=process_translate, | |
inputs=[sel_translate_language, sel_audio], | |
outputs=[text_translate, status_translate] | |
).then( | |
enable_buttons, | |
outputs=[submit_transcript, submit_translate, submit_chat], | |
) | |
# Chat | |
submit_chat.click( | |
disable_buttons, | |
outputs=[submit_transcript, submit_translate, submit_chat], | |
trigger_mode="once", | |
).then( | |
fn=process_chat, | |
inputs=[question_chat, sel_audio], | |
outputs=[text_chat, status_chat] | |
).then( | |
enable_buttons, | |
outputs=[submit_transcript, submit_translate, submit_chat], | |
) | |
### Launch the app | |
if __name__ == "__main__": | |
voxtral.queue().launch(debug=True) | |