doodle-med's picture
Upload complete audio-to-kinetic-video application with all dependencies and utilities
9fa4d05
import os
import subprocess
import json
def stitch_and_caption(
segment_videos,
audio_path,
transcription_segments,
template_name,
work_dir=".",
crossfade_duration=0.25
):
"""
Stitch video segments with crossfade transitions, add original audio, and overlay kinetic captions.
Args:
segment_videos (list): List of file paths for the video segments.
audio_path (str): Path to the original audio file.
transcription_segments (list): The list of segment dictionaries from segment.py, including text and word timestamps.
template_name (str): The name of the PyCaps template to use.
work_dir (str): The working directory for temporary and final files.
crossfade_duration (float): Duration of crossfade transitions in seconds (0 for hard cuts).
Returns:
str: The path to the final subtitled video.
"""
if not segment_videos:
raise RuntimeError("No video segments to stitch.")
stitched_path = os.path.join(work_dir, "stitched.mp4")
final_path = os.path.join(work_dir, "final_video.mp4")
# 1. Stitch video segments together with crossfades using ffmpeg
print("Stitching video segments with crossfades...")
try:
# Get accurate durations for each video segment using ffprobe
durations = [_get_video_duration(seg_file) for seg_file in segment_videos]
cross_dur = crossfade_duration # Crossfade duration in seconds
# Handle the case where crossfade is disabled (hard cuts)
if cross_dur <= 0:
# Use concat demuxer for hard cuts (more reliable for exact segment timing)
concat_file = os.path.join(work_dir, "concat_list.txt")
with open(concat_file, "w") as f:
for seg_file in segment_videos:
f.write(f"file '{os.path.abspath(seg_file)}'\n")
# Run ffmpeg with concat demuxer
cmd = [
"ffmpeg", "-y",
"-f", "concat",
"-safe", "0",
"-i", concat_file,
"-i", audio_path,
"-c:v", "copy", # Copy video stream without re-encoding for speed
"-c:a", "aac",
"-b:a", "192k",
"-map", "0:v",
"-map", "1:a",
"-shortest",
stitched_path
]
subprocess.run(cmd, check=True, capture_output=True, text=True)
else:
# Build the complex filter string for ffmpeg with crossfades
inputs = []
filter_complex_parts = []
stream_labels = []
# Prepare inputs and initial stream labels
for i, seg_file in enumerate(segment_videos):
inputs.extend(["-i", seg_file])
stream_labels.append(f"[{i}:v]")
# If only one video, no stitching needed, just prep for subtitling
if len(segment_videos) == 1:
final_video_stream = "[0:v]"
filter_complex_str = f"[0:v]format=yuv420p[video]"
else:
# Sequentially chain xfade filters
last_stream_label = stream_labels[0]
current_offset = 0.0
for i in range(len(segment_videos) - 1):
current_offset += durations[i] - cross_dur
next_stream_label = f"v{i+1}"
filter_complex_parts.append(
f"{last_stream_label}{stream_labels[i+1]}"
f"xfade=transition=fade:duration={cross_dur}:offset={current_offset}"
f"[{next_stream_label}]"
)
last_stream_label = f"[{next_stream_label}]"
final_video_stream = last_stream_label
filter_complex_str = ";".join(filter_complex_parts)
filter_complex_str += f";{final_video_stream}format=yuv420p[video]"
# Construct the full ffmpeg command
cmd = ["ffmpeg", "-y"]
cmd.extend(inputs)
cmd.extend(["-i", audio_path]) # Add original audio as the last input
cmd.extend([
"-filter_complex", filter_complex_str,
"-map", "[video]", # Map the final video stream
"-map", f"{len(segment_videos)}:a", # Map the audio stream
"-c:v", "libx264",
"-crf", "18",
"-preset", "fast",
"-c:a", "aac",
"-b:a", "192k",
"-shortest", # Finish encoding when the shortest stream ends
stitched_path
])
subprocess.run(cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print("Error during ffmpeg stitching:")
print("FFMPEG stdout:", e.stdout)
print("FFMPEG stderr:", e.stderr)
raise RuntimeError("FFMPEG stitching failed.") from e
# 2. Use PyCaps to render captions on the stitched video
print("Overlaying kinetic subtitles...")
# Save the real transcription data to a JSON file for PyCaps
transcription_json_path = os.path.join(work_dir, "transcription_for_pycaps.json")
_save_whisper_json(transcription_segments, transcription_json_path)
# Run pycaps render command
try:
pycaps_cmd = [
"pycaps", "render",
"--input", stitched_path,
"--template", os.path.join("templates", template_name),
"--whisper-json", transcription_json_path,
"--output", final_path
]
subprocess.run(pycaps_cmd, check=True, capture_output=True, text=True)
except FileNotFoundError:
raise RuntimeError("`pycaps` command not found. Make sure pycaps is installed correctly (e.g., `pip install git+https://github.com/francozanardi/pycaps.git`).")
except subprocess.CalledProcessError as e:
print("Error during PyCaps subtitle rendering:")
print("PyCaps stdout:", e.stdout)
print("PyCaps stderr:", e.stderr)
raise RuntimeError("PyCaps rendering failed.") from e
return final_path
def _get_video_duration(file_path):
"""Get video duration in seconds using ffprobe."""
try:
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
file_path
]
output = subprocess.check_output(cmd, text=True).strip()
return float(output)
except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e:
print(f"Warning: Could not get duration for {file_path}. Error: {e}. Falling back to 0.0.")
return 0.0
def _save_whisper_json(transcription_segments, json_path):
"""
Saves the transcription segments into a Whisper-formatted JSON file for PyCaps.
Args:
transcription_segments (list): A list of segment dictionaries, each containing
'start', 'end', 'text', and 'words' keys.
json_path (str): The file path to save the JSON data.
"""
print(f"Saving transcription to {json_path} for subtitling...")
# The structure pycaps expects is a dictionary with a "segments" key,
# which contains the list of segment dictionaries.
output_data = {
"text": " ".join([seg.get('text', '') for seg in transcription_segments]),
"segments": transcription_segments,
"language": "en"
}
try:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(output_data, f, ensure_ascii=False, indent=2)
except Exception as e:
raise RuntimeError(f"Failed to write transcription JSON file at {json_path}") from e