""" Audio processing utilities for EceMotion Pictures. Enhanced text-to-speech generation with robust error handling and fallbacks. """ import numpy as np import logging import os from typing import Tuple, Optional, Dict, Any from config import ( MODEL_AUDIO, MODEL_CONFIGS, AUDIO_SAMPLE_RATE, get_device, get_safe_model_name ) logger = logging.getLogger(__name__) # Global model cache _tts_pipe = None _current_tts_model = None def get_tts_pipe(model_name: str = MODEL_AUDIO, device: str = None): """Get or create TTS pipeline with lazy loading and model switching.""" global _tts_pipe, _current_tts_model if device is None: device = get_device() # Use safe model name safe_model_name = get_safe_model_name(model_name, "audio") if _tts_pipe is None or _current_tts_model != safe_model_name: logger.info(f"Loading TTS model: {safe_model_name}") try: if "f5-tts" in safe_model_name.lower(): # Try F5-TTS first _tts_pipe = _load_f5_tts(safe_model_name, device) else: # Use standard TTS pipeline _tts_pipe = _load_standard_tts(safe_model_name, device) if _tts_pipe is not None: _current_tts_model = safe_model_name logger.info(f"TTS model {safe_model_name} loaded successfully") else: raise RuntimeError("Failed to load any TTS model") except Exception as e: logger.error(f"Failed to load {safe_model_name}: {e}") # Fallback to original model _tts_pipe = _load_standard_tts("parler-tts/parler-tts-mini-v1", device) _current_tts_model = "parler-tts/parler-tts-mini-v1" return _tts_pipe def _load_f5_tts(model_name: str, device: str): """Load F5-TTS model.""" try: from transformers import pipeline pipe = pipeline( "text-to-speech", model=model_name, torch_dtype="auto", device_map=device if device == "cuda" else None ) return pipe except Exception as e: logger.error(f"Failed to load F5-TTS: {e}") return None def _load_standard_tts(model_name: str, device: str): """Load standard TTS model.""" try: from transformers import pipeline import torch # Fix device string - convert "auto" to proper device if device == "auto": device = "cuda" if torch.cuda.is_available() else "cpu" pipe = pipeline( "text-to-speech", model=model_name, torch_dtype=torch.float16 if device == "cuda" else torch.float32 ) if device == "cuda": pipe = pipe.to(device) return pipe except Exception as e: logger.error(f"Failed to load standard TTS: {e}") return None def synth_voice(text: str, voice_prompt: str, sr: int = AUDIO_SAMPLE_RATE, model_name: str = MODEL_AUDIO, device: str = None) -> Tuple[int, np.ndarray]: """ Generate speech from text with enhanced TTS support. """ if device is None: device = get_device() tts = get_tts_pipe(model_name, device) model_config = MODEL_CONFIGS.get(_current_tts_model, {}) # Validate text length max_length = model_config.get("max_text_length", 500) min_length = model_config.get("min_text_length", 10) if len(text) > max_length: logger.warning(f"Text too long ({len(text)} chars), truncating to {max_length}") text = text[:max_length] elif len(text) < min_length: logger.warning(f"Text too short ({len(text)} chars), padding") text = text + " " * (min_length - len(text)) try: if "f5-tts" in _current_tts_model.lower(): # F5-TTS specific generation result = tts( text=text, voice_preset=voice_prompt, return_tensors="pt" ) wav = result["audio"].numpy().flatten() else: # Standard pipeline (Parler-TTS, etc.) result = tts({"text": text, "voice_preset": voice_prompt}) wav = result["audio"] # Ensure proper format if hasattr(wav, 'numpy'): wav = wav.numpy() elif hasattr(wav, 'detach'): wav = wav.detach().numpy() # Normalize audio wav = normalize_audio(wav) # Resample if needed if sr != AUDIO_SAMPLE_RATE: wav = _resample_audio(wav, AUDIO_SAMPLE_RATE, sr) logger.info(f"Generated audio: {len(wav)/sr:.2f}s at {sr}Hz") return sr, wav.astype(np.float32) except Exception as e: logger.error(f"Voice synthesis failed: {e}") # Return fallback audio return _create_fallback_audio(text, sr) def _resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: """Resample audio using available methods.""" try: import librosa return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) except ImportError: # Simple resampling without librosa ratio = target_sr / orig_sr new_length = int(len(audio) * ratio) return np.interp( np.linspace(0, len(audio), new_length), np.arange(len(audio)), audio ) def _create_fallback_audio(text: str, sr: int) -> Tuple[int, np.ndarray]: """Create fallback audio when TTS fails.""" try: # Create a simple tone based on text length duration = max(1.0, len(text) / 20.0) # Rough estimate t = np.linspace(0, duration, int(sr * duration), endpoint=False) # Generate a simple tone frequency = 440.0 # A4 note wav = 0.1 * np.sin(2 * np.pi * frequency * t) # Add some variation wav += 0.05 * np.sin(2 * np.pi * frequency * 1.5 * t) logger.info(f"Created fallback audio: {duration:.2f}s") return sr, wav.astype(np.float32) except Exception as e: logger.error(f"Failed to create fallback audio: {e}") # Last resort: silence duration = 2.0 wav = np.zeros(int(sr * duration)) return sr, wav.astype(np.float32) def normalize_audio(audio: np.ndarray, target_lufs: float = -23.0) -> np.ndarray: """Normalize audio to broadcast standards.""" # Simple peak normalization first if np.max(np.abs(audio)) > 0: audio = audio / np.max(np.abs(audio)) * 0.95 # Apply gentle compression audio = apply_compression(audio) return audio def apply_compression(audio: np.ndarray, ratio: float = 3.0, threshold: float = 0.7) -> np.ndarray: """Apply gentle compression for broadcast quality.""" # Simple soft-knee compression compressed = np.copy(audio) # Above threshold, apply compression above_threshold = np.abs(audio) > threshold compressed[above_threshold] = np.sign(audio[above_threshold]) * ( threshold + (np.abs(audio[above_threshold]) - threshold) / ratio ) return compressed def retro_bed(duration_s: float, sr: int = AUDIO_SAMPLE_RATE, bpm: int = 92): """Generate retro synth background music.""" try: t = np.linspace(0, duration_s, int(sr * duration_s), endpoint=False) # Chord progression root frequencies (A minor style) freqs = [220.0, 174.61, 196.0, 146.83] seg_len = int(len(t) / len(freqs)) if len(freqs) else len(t) sig = np.zeros_like(t) for i, f0 in enumerate(freqs): tri_t = t[i * seg_len:(i + 1) * seg_len] tri = 2 * np.abs(2 * ((tri_t * f0) % 1) - 1) - 1 sig[i * seg_len:(i + 1) * seg_len] = 0.15 * tri # Add tape noise noise = 0.01 * np.random.randn(len(t)) bed = sig + noise # Apply gentle lowpass filter try: from scipy import signal b, a = signal.butter(3, 3000, 'low', fs=sr) bed = signal.lfilter(b, a, bed) except ImportError: # Simple averaging filter if scipy not available bed = np.convolve(bed, np.ones(5)/5, mode='same') return sr, bed.astype(np.float32) except Exception as e: logger.error(f"Failed to generate retro bed: {e}") # Return silence silence = np.zeros(int(sr * duration_s)) return sr, silence.astype(np.float32) def mix_to_stereo(sr1, a, sr2, b, bed_gain=0.5): """Mix two mono signals to stereo.""" assert sr1 == sr2, "Sample rates must match" n = max(len(a), len(b)) def pad(x): if len(x) < n: if len(x.shape) > 1: # Stereo padding = np.zeros((n - len(x), x.shape[1])) else: # Mono padding = np.zeros(n - len(x)) x = np.concatenate([x, padding]) return x a = pad(a) b = pad(b) left = a + bed_gain * b right = a * 0.9 + bed_gain * 0.9 * b if len(left.shape) == 1: # Mono to stereo stereo = np.stack([left, right], axis=1) else: # Already stereo stereo = np.stack([left, right], axis=1) return sr1, np.clip(stereo, -1.0, 1.0) def write_wav(path: str, sr: int, wav: np.ndarray): """Write audio to WAV file.""" try: import soundfile as sf sf.write(path, wav, sr) except ImportError: # Fallback using scipy try: from scipy.io import wavfile # Convert to 16-bit wav_16bit = (wav * 32767).astype(np.int16) wavfile.write(path, sr, wav_16bit) except ImportError: logger.error("No audio writing library available (soundfile or scipy)") raise RuntimeError("Cannot write audio file - no audio library available")