Audio2KineticVid / utils /segment.py
doodle-med's picture
Upload complete audio-to-kinetic-video application with all dependencies and utilities
9fa4d05
"""
Audio segment processing for creating meaningful lyric segments for video generation.
This module takes Whisper transcription results and intelligently segments them
at natural pause points for synchronized video scene changes.
"""
import re
from typing import List, Dict, Any
def segment_lyrics(transcription_result: Dict[str, Any], min_segment_duration: float = 2.0, max_segment_duration: float = 8.0) -> List[Dict[str, Any]]:
"""
Segment the transcription into meaningful chunks for video generation.
This function takes the raw Whisper transcription and creates logical segments
by identifying natural pause points in the audio. Each segment represents
a coherent lyrical phrase that will correspond to one video scene.
Args:
transcription_result: Dictionary from Whisper transcription containing 'segments'
min_segment_duration: Minimum duration for a segment in seconds
max_segment_duration: Maximum duration for a segment in seconds
Returns:
List of segment dictionaries with keys:
- 'text': The lyrical text for this segment
- 'start': Start time in seconds
- 'end': End time in seconds
- 'words': List of word-level timestamps (if available)
"""
if not transcription_result or 'segments' not in transcription_result:
return []
raw_segments = transcription_result['segments']
if not raw_segments:
return []
# First, merge very short segments and split very long ones
processed_segments = []
for segment in raw_segments:
duration = segment.get('end', 0) - segment.get('start', 0)
text = segment.get('text', '').strip()
if duration < min_segment_duration:
# Try to merge with previous segment if it exists and won't exceed max duration
if (processed_segments and
(processed_segments[-1]['end'] - processed_segments[-1]['start'] + duration) <= max_segment_duration):
# Merge with previous segment
processed_segments[-1]['text'] += ' ' + text
processed_segments[-1]['end'] = segment.get('end', processed_segments[-1]['end'])
if 'words' in segment and 'words' in processed_segments[-1]:
processed_segments[-1]['words'].extend(segment['words'])
else:
# Add as new segment even if short
processed_segments.append({
'text': text,
'start': segment.get('start', 0),
'end': segment.get('end', 0),
'words': segment.get('words', [])
})
elif duration > max_segment_duration:
# Split long segments at natural break points
split_segments = _split_long_segment(segment, max_segment_duration)
processed_segments.extend(split_segments)
else:
# Duration is just right
processed_segments.append({
'text': text,
'start': segment.get('start', 0),
'end': segment.get('end', 0),
'words': segment.get('words', [])
})
# Second pass: apply intelligent segmentation based on content
final_segments = _apply_intelligent_segmentation(processed_segments, max_segment_duration)
# Ensure no empty segments
final_segments = [seg for seg in final_segments if seg['text'].strip()]
return final_segments
def _split_long_segment(segment: Dict[str, Any], max_duration: float) -> List[Dict[str, Any]]:
"""
Split a long segment into smaller ones at natural break points.
"""
text = segment.get('text', '').strip()
words = segment.get('words', [])
start_time = segment.get('start', 0)
end_time = segment.get('end', 0)
duration = end_time - start_time
if not words or duration <= max_duration:
return [segment]
# Try to split at punctuation marks or word boundaries
split_points = []
# Find punctuation-based split points
for i, word in enumerate(words):
word_text = word.get('word', '').strip()
if re.search(r'[.!?;,:]', word_text):
split_points.append(i)
# If no punctuation, split at word boundaries roughly evenly
if not split_points:
target_splits = int(duration / max_duration)
words_per_split = len(words) // (target_splits + 1)
split_points = [i * words_per_split for i in range(1, target_splits + 1) if i * words_per_split < len(words)]
if not split_points:
return [segment]
# Create segments from split points
segments = []
last_idx = 0
for split_idx in split_points:
if split_idx >= len(words):
continue
segment_words = words[last_idx:split_idx + 1]
if segment_words:
segments.append({
'text': ' '.join([w.get('word', '') for w in segment_words]).strip(),
'start': segment_words[0].get('start', start_time),
'end': segment_words[-1].get('end', end_time),
'words': segment_words
})
last_idx = split_idx + 1
# Add remaining words as final segment
if last_idx < len(words):
segment_words = words[last_idx:]
segments.append({
'text': ' '.join([w.get('word', '') for w in segment_words]).strip(),
'start': segment_words[0].get('start', start_time),
'end': segment_words[-1].get('end', end_time),
'words': segment_words
})
return segments
def _apply_intelligent_segmentation(segments: List[Dict[str, Any]], max_duration: float) -> List[Dict[str, Any]]:
"""
Apply intelligent segmentation rules based on lyrical content and timing.
"""
if not segments:
return []
final_segments = []
current_segment = None
for segment in segments:
text = segment['text'].strip()
# Skip empty segments
if not text:
continue
# If no current segment, start a new one
if current_segment is None:
current_segment = segment.copy()
continue
# Check if we should merge with current segment
should_merge = _should_merge_segments(current_segment, segment, max_duration)
if should_merge:
# Merge segments
current_segment['text'] += ' ' + segment['text']
current_segment['end'] = segment['end']
if 'words' in segment and 'words' in current_segment:
current_segment['words'].extend(segment['words'])
else:
# Finalize current segment and start new one
final_segments.append(current_segment)
current_segment = segment.copy()
# Add the last segment
if current_segment is not None:
final_segments.append(current_segment)
return final_segments
def _should_merge_segments(current: Dict[str, Any], next_seg: Dict[str, Any], max_duration: float) -> bool:
"""
Determine if two segments should be merged based on content and timing.
"""
# Check duration constraint
merged_duration = next_seg['end'] - current['start']
if merged_duration > max_duration:
return False
current_text = current['text'].strip()
next_text = next_seg['text'].strip()
# Don't merge if current segment ends with strong punctuation
if re.search(r'[.!?]$', current_text):
return False
# Merge if current segment is very short (likely incomplete phrase)
if len(current_text.split()) < 3:
return True
# Merge if next segment starts with a lowercase word (continuation)
if next_text and next_text[0].islower():
return True
# Merge if there's a short gap between segments (< 0.5 seconds)
gap = next_seg['start'] - current['end']
if gap < 0.5:
return True
# Don't merge by default
return False
def get_segment_info(segments: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Get summary information about the segments.
Args:
segments: List of segment dictionaries
Returns:
Dictionary with segment statistics
"""
if not segments:
return {
'total_segments': 0,
'total_duration': 0,
'average_duration': 0,
'shortest_duration': 0,
'longest_duration': 0
}
durations = [seg['end'] - seg['start'] for seg in segments]
total_duration = segments[-1]['end'] - segments[0]['start'] if segments else 0
return {
'total_segments': len(segments),
'total_duration': total_duration,
'average_duration': sum(durations) / len(durations),
'shortest_duration': min(durations),
'longest_duration': max(durations),
'segments_preview': [{'text': seg['text'][:50] + '...', 'duration': seg['end'] - seg['start']} for seg in segments[:5]]
}