""" Audio segment processing for creating meaningful lyric segments for video generation. This module takes Whisper transcription results and intelligently segments them at natural pause points for synchronized video scene changes. """ import re from typing import List, Dict, Any def segment_lyrics(transcription_result: Dict[str, Any], min_segment_duration: float = 2.0, max_segment_duration: float = 8.0) -> List[Dict[str, Any]]: """ Segment the transcription into meaningful chunks for video generation. This function takes the raw Whisper transcription and creates logical segments by identifying natural pause points in the audio. Each segment represents a coherent lyrical phrase that will correspond to one video scene. Args: transcription_result: Dictionary from Whisper transcription containing 'segments' min_segment_duration: Minimum duration for a segment in seconds max_segment_duration: Maximum duration for a segment in seconds Returns: List of segment dictionaries with keys: - 'text': The lyrical text for this segment - 'start': Start time in seconds - 'end': End time in seconds - 'words': List of word-level timestamps (if available) """ if not transcription_result or 'segments' not in transcription_result: return [] raw_segments = transcription_result['segments'] if not raw_segments: return [] # First, merge very short segments and split very long ones processed_segments = [] for segment in raw_segments: duration = segment.get('end', 0) - segment.get('start', 0) text = segment.get('text', '').strip() if duration < min_segment_duration: # Try to merge with previous segment if it exists and won't exceed max duration if (processed_segments and (processed_segments[-1]['end'] - processed_segments[-1]['start'] + duration) <= max_segment_duration): # Merge with previous segment processed_segments[-1]['text'] += ' ' + text processed_segments[-1]['end'] = segment.get('end', processed_segments[-1]['end']) if 'words' in segment and 'words' in processed_segments[-1]: processed_segments[-1]['words'].extend(segment['words']) else: # Add as new segment even if short processed_segments.append({ 'text': text, 'start': segment.get('start', 0), 'end': segment.get('end', 0), 'words': segment.get('words', []) }) elif duration > max_segment_duration: # Split long segments at natural break points split_segments = _split_long_segment(segment, max_segment_duration) processed_segments.extend(split_segments) else: # Duration is just right processed_segments.append({ 'text': text, 'start': segment.get('start', 0), 'end': segment.get('end', 0), 'words': segment.get('words', []) }) # Second pass: apply intelligent segmentation based on content final_segments = _apply_intelligent_segmentation(processed_segments, max_segment_duration) # Ensure no empty segments final_segments = [seg for seg in final_segments if seg['text'].strip()] return final_segments def _split_long_segment(segment: Dict[str, Any], max_duration: float) -> List[Dict[str, Any]]: """ Split a long segment into smaller ones at natural break points. """ text = segment.get('text', '').strip() words = segment.get('words', []) start_time = segment.get('start', 0) end_time = segment.get('end', 0) duration = end_time - start_time if not words or duration <= max_duration: return [segment] # Try to split at punctuation marks or word boundaries split_points = [] # Find punctuation-based split points for i, word in enumerate(words): word_text = word.get('word', '').strip() if re.search(r'[.!?;,:]', word_text): split_points.append(i) # If no punctuation, split at word boundaries roughly evenly if not split_points: target_splits = int(duration / max_duration) words_per_split = len(words) // (target_splits + 1) split_points = [i * words_per_split for i in range(1, target_splits + 1) if i * words_per_split < len(words)] if not split_points: return [segment] # Create segments from split points segments = [] last_idx = 0 for split_idx in split_points: if split_idx >= len(words): continue segment_words = words[last_idx:split_idx + 1] if segment_words: segments.append({ 'text': ' '.join([w.get('word', '') for w in segment_words]).strip(), 'start': segment_words[0].get('start', start_time), 'end': segment_words[-1].get('end', end_time), 'words': segment_words }) last_idx = split_idx + 1 # Add remaining words as final segment if last_idx < len(words): segment_words = words[last_idx:] segments.append({ 'text': ' '.join([w.get('word', '') for w in segment_words]).strip(), 'start': segment_words[0].get('start', start_time), 'end': segment_words[-1].get('end', end_time), 'words': segment_words }) return segments def _apply_intelligent_segmentation(segments: List[Dict[str, Any]], max_duration: float) -> List[Dict[str, Any]]: """ Apply intelligent segmentation rules based on lyrical content and timing. """ if not segments: return [] final_segments = [] current_segment = None for segment in segments: text = segment['text'].strip() # Skip empty segments if not text: continue # If no current segment, start a new one if current_segment is None: current_segment = segment.copy() continue # Check if we should merge with current segment should_merge = _should_merge_segments(current_segment, segment, max_duration) if should_merge: # Merge segments current_segment['text'] += ' ' + segment['text'] current_segment['end'] = segment['end'] if 'words' in segment and 'words' in current_segment: current_segment['words'].extend(segment['words']) else: # Finalize current segment and start new one final_segments.append(current_segment) current_segment = segment.copy() # Add the last segment if current_segment is not None: final_segments.append(current_segment) return final_segments def _should_merge_segments(current: Dict[str, Any], next_seg: Dict[str, Any], max_duration: float) -> bool: """ Determine if two segments should be merged based on content and timing. """ # Check duration constraint merged_duration = next_seg['end'] - current['start'] if merged_duration > max_duration: return False current_text = current['text'].strip() next_text = next_seg['text'].strip() # Don't merge if current segment ends with strong punctuation if re.search(r'[.!?]$', current_text): return False # Merge if current segment is very short (likely incomplete phrase) if len(current_text.split()) < 3: return True # Merge if next segment starts with a lowercase word (continuation) if next_text and next_text[0].islower(): return True # Merge if there's a short gap between segments (< 0.5 seconds) gap = next_seg['start'] - current['end'] if gap < 0.5: return True # Don't merge by default return False def get_segment_info(segments: List[Dict[str, Any]]) -> Dict[str, Any]: """ Get summary information about the segments. Args: segments: List of segment dictionaries Returns: Dictionary with segment statistics """ if not segments: return { 'total_segments': 0, 'total_duration': 0, 'average_duration': 0, 'shortest_duration': 0, 'longest_duration': 0 } durations = [seg['end'] - seg['start'] for seg in segments] total_duration = segments[-1]['end'] - segments[0]['start'] if segments else 0 return { 'total_segments': len(segments), 'total_duration': total_duration, 'average_duration': sum(durations) / len(durations), 'shortest_duration': min(durations), 'longest_duration': max(durations), 'segments_preview': [{'text': seg['text'][:50] + '...', 'duration': seg['end'] - seg['start']} for seg in segments[:5]] }