Spaces:
Running
Running
import os | |
import subprocess | |
import json | |
def stitch_and_caption( | |
segment_videos, | |
audio_path, | |
transcription_segments, | |
template_name, | |
work_dir=".", | |
crossfade_duration=0.25 | |
): | |
""" | |
Stitch video segments with crossfade transitions, add original audio, and overlay kinetic captions. | |
Args: | |
segment_videos (list): List of file paths for the video segments. | |
audio_path (str): Path to the original audio file. | |
transcription_segments (list): The list of segment dictionaries from segment.py, including text and word timestamps. | |
template_name (str): The name of the PyCaps template to use. | |
work_dir (str): The working directory for temporary and final files. | |
crossfade_duration (float): Duration of crossfade transitions in seconds (0 for hard cuts). | |
Returns: | |
str: The path to the final subtitled video. | |
""" | |
if not segment_videos: | |
raise RuntimeError("No video segments to stitch.") | |
stitched_path = os.path.join(work_dir, "stitched.mp4") | |
final_path = os.path.join(work_dir, "final_video.mp4") | |
# 1. Stitch video segments together with crossfades using ffmpeg | |
print("Stitching video segments with crossfades...") | |
try: | |
# Get accurate durations for each video segment using ffprobe | |
durations = [_get_video_duration(seg_file) for seg_file in segment_videos] | |
cross_dur = crossfade_duration # Crossfade duration in seconds | |
# Handle the case where crossfade is disabled (hard cuts) | |
if cross_dur <= 0: | |
# Use concat demuxer for hard cuts (more reliable for exact segment timing) | |
concat_file = os.path.join(work_dir, "concat_list.txt") | |
with open(concat_file, "w") as f: | |
for seg_file in segment_videos: | |
f.write(f"file '{os.path.abspath(seg_file)}'\n") | |
# Run ffmpeg with concat demuxer | |
cmd = [ | |
"ffmpeg", "-y", | |
"-f", "concat", | |
"-safe", "0", | |
"-i", concat_file, | |
"-i", audio_path, | |
"-c:v", "copy", # Copy video stream without re-encoding for speed | |
"-c:a", "aac", | |
"-b:a", "192k", | |
"-map", "0:v", | |
"-map", "1:a", | |
"-shortest", | |
stitched_path | |
] | |
subprocess.run(cmd, check=True, capture_output=True, text=True) | |
else: | |
# Build the complex filter string for ffmpeg with crossfades | |
inputs = [] | |
filter_complex_parts = [] | |
stream_labels = [] | |
# Prepare inputs and initial stream labels | |
for i, seg_file in enumerate(segment_videos): | |
inputs.extend(["-i", seg_file]) | |
stream_labels.append(f"[{i}:v]") | |
# If only one video, no stitching needed, just prep for subtitling | |
if len(segment_videos) == 1: | |
final_video_stream = "[0:v]" | |
filter_complex_str = f"[0:v]format=yuv420p[video]" | |
else: | |
# Sequentially chain xfade filters | |
last_stream_label = stream_labels[0] | |
current_offset = 0.0 | |
for i in range(len(segment_videos) - 1): | |
current_offset += durations[i] - cross_dur | |
next_stream_label = f"v{i+1}" | |
filter_complex_parts.append( | |
f"{last_stream_label}{stream_labels[i+1]}" | |
f"xfade=transition=fade:duration={cross_dur}:offset={current_offset}" | |
f"[{next_stream_label}]" | |
) | |
last_stream_label = f"[{next_stream_label}]" | |
final_video_stream = last_stream_label | |
filter_complex_str = ";".join(filter_complex_parts) | |
filter_complex_str += f";{final_video_stream}format=yuv420p[video]" | |
# Construct the full ffmpeg command | |
cmd = ["ffmpeg", "-y"] | |
cmd.extend(inputs) | |
cmd.extend(["-i", audio_path]) # Add original audio as the last input | |
cmd.extend([ | |
"-filter_complex", filter_complex_str, | |
"-map", "[video]", # Map the final video stream | |
"-map", f"{len(segment_videos)}:a", # Map the audio stream | |
"-c:v", "libx264", | |
"-crf", "18", | |
"-preset", "fast", | |
"-c:a", "aac", | |
"-b:a", "192k", | |
"-shortest", # Finish encoding when the shortest stream ends | |
stitched_path | |
]) | |
subprocess.run(cmd, check=True, capture_output=True, text=True) | |
except subprocess.CalledProcessError as e: | |
print("Error during ffmpeg stitching:") | |
print("FFMPEG stdout:", e.stdout) | |
print("FFMPEG stderr:", e.stderr) | |
raise RuntimeError("FFMPEG stitching failed.") from e | |
# 2. Use PyCaps to render captions on the stitched video | |
print("Overlaying kinetic subtitles...") | |
# Save the real transcription data to a JSON file for PyCaps | |
transcription_json_path = os.path.join(work_dir, "transcription_for_pycaps.json") | |
_save_whisper_json(transcription_segments, transcription_json_path) | |
# Run pycaps render command | |
try: | |
pycaps_cmd = [ | |
"pycaps", "render", | |
"--input", stitched_path, | |
"--template", os.path.join("templates", template_name), | |
"--whisper-json", transcription_json_path, | |
"--output", final_path | |
] | |
subprocess.run(pycaps_cmd, check=True, capture_output=True, text=True) | |
except FileNotFoundError: | |
raise RuntimeError("`pycaps` command not found. Make sure pycaps is installed correctly (e.g., `pip install git+https://github.com/francozanardi/pycaps.git`).") | |
except subprocess.CalledProcessError as e: | |
print("Error during PyCaps subtitle rendering:") | |
print("PyCaps stdout:", e.stdout) | |
print("PyCaps stderr:", e.stderr) | |
raise RuntimeError("PyCaps rendering failed.") from e | |
return final_path | |
def _get_video_duration(file_path): | |
"""Get video duration in seconds using ffprobe.""" | |
try: | |
cmd = [ | |
"ffprobe", "-v", "error", | |
"-select_streams", "v:0", | |
"-show_entries", "format=duration", | |
"-of", "default=noprint_wrappers=1:nokey=1", | |
file_path | |
] | |
output = subprocess.check_output(cmd, text=True).strip() | |
return float(output) | |
except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e: | |
print(f"Warning: Could not get duration for {file_path}. Error: {e}. Falling back to 0.0.") | |
return 0.0 | |
def _save_whisper_json(transcription_segments, json_path): | |
""" | |
Saves the transcription segments into a Whisper-formatted JSON file for PyCaps. | |
Args: | |
transcription_segments (list): A list of segment dictionaries, each containing | |
'start', 'end', 'text', and 'words' keys. | |
json_path (str): The file path to save the JSON data. | |
""" | |
print(f"Saving transcription to {json_path} for subtitling...") | |
# The structure pycaps expects is a dictionary with a "segments" key, | |
# which contains the list of segment dictionaries. | |
output_data = { | |
"text": " ".join([seg.get('text', '') for seg in transcription_segments]), | |
"segments": transcription_segments, | |
"language": "en" | |
} | |
try: | |
with open(json_path, 'w', encoding='utf-8') as f: | |
json.dump(output_data, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
raise RuntimeError(f"Failed to write transcription JSON file at {json_path}") from e |