|
import constants |
|
import os |
|
from PIL import Image |
|
from gradio_client import Client |
|
import moviepy.editor as mp |
|
from structured_output_extractor import StructuredOutputExtractor |
|
from response_schemas import ScenesResponseSchema |
|
from typing import List, Dict |
|
from natsort import natsorted |
|
|
|
|
|
|
|
def get_scenes(text_script: str): |
|
|
|
read_time = calculate_read_time(text_script) |
|
prompt = f""" |
|
ROLE: Story to Scene Generator |
|
Tasks: For the given story |
|
1. Read it Completely and Understand the Complete Context |
|
2. Rewrite the story in tiny scenes(but without even changing a word) with highly detailed and context aware list of image prompts to visualize each scene |
|
3. Never Describe complete scene in a single image prompt use multiple prompts |
|
RULE OF THUMB: 12 image prompts / 1 min audio (Note: You will be provided with estimated read Time for each story or script) |
|
|
|
here is the Estimated Read Time of the complete story: {read_time}\n\n |
|
and Here is the Complete Story: {text_script} |
|
""" |
|
|
|
|
|
extractor = StructuredOutputExtractor(response_schema=ScenesResponseSchema) |
|
result = extractor.extract(prompt) |
|
return result.model_dump() |
|
|
|
def generate_video_assets(scenes: Dict, language: str, speaker: str, base_path: str = "media") -> str: |
|
try: |
|
|
|
if not os.path.exists(base_path): |
|
os.makedirs(base_path) |
|
|
|
|
|
scenes_list = scenes.get("scenes", []) |
|
print(f"Total Scenes: {len(scenes_list)}") |
|
|
|
|
|
video_folder = os.path.join(base_path, f"video_{len(os.listdir(base_path)) + 1}") |
|
if not os.path.exists(video_folder): |
|
os.makedirs(video_folder) |
|
|
|
|
|
images_folder = os.path.join(video_folder, "images") |
|
audio_folder = os.path.join(video_folder, "audio") |
|
os.makedirs(images_folder, exist_ok=True) |
|
os.makedirs(audio_folder, exist_ok=True) |
|
|
|
for scene_count, scene in enumerate(scenes_list): |
|
text: str = scene.get("text", "") |
|
image_prompts: List[str] = scene.get("image_prompts", []) |
|
|
|
|
|
scene_images_folder = os.path.join(images_folder, f"scene_{scene_count + 1}") |
|
os.makedirs(scene_images_folder, exist_ok=True) |
|
|
|
|
|
audio_path = os.path.join(audio_folder, f"scene_{scene_count + 1}.mp3") |
|
audio_result = generate_audio(text, language, speaker, path=audio_path) |
|
|
|
if "error" in audio_result: |
|
print(f"Error generating audio for scene {scene_count + 1}: {audio_result['error']}") |
|
continue |
|
|
|
|
|
image_paths = [] |
|
for count, prompt in enumerate(image_prompts): |
|
image_path = os.path.join(scene_images_folder, f"scene_{scene_count + 1}_image_{count + 1}.png") |
|
image_result = generate_image(prompt=prompt, path=image_path) |
|
|
|
if "error" in image_result: |
|
print(f"Error generating image {count + 1} for scene {scene_count + 1}: {image_result['error']}") |
|
else: |
|
image_paths.append(image_path) |
|
|
|
print(f"Scene: {scene_count + 1}\t No of Images in Scene {scene_count + 1}: {len(image_paths)}") |
|
|
|
|
|
return video_folder |
|
|
|
except Exception as e: |
|
print(f"Error during video asset generation: {e}") |
|
return {"error": str(e)} |
|
|
|
|
|
def generate_audio(text, language_code, speaker, path='test_audio.mp3'): |
|
try: |
|
|
|
client = Client("habib926653/Multilingual-TTS") |
|
|
|
|
|
result = client.predict( |
|
text=text, |
|
language_code=language_code, |
|
speaker=speaker, |
|
api_name="/text_to_speech_edge" |
|
) |
|
|
|
|
|
audio_file_path = result[1] |
|
|
|
|
|
with open(audio_file_path, 'rb') as f: |
|
audio_bytes = f.read() |
|
|
|
|
|
with open(path, 'wb') as f: |
|
f.write(audio_bytes) |
|
|
|
|
|
return {"audio_file": path} |
|
|
|
except Exception as e: |
|
print(f"Error during audio generation: {e}") |
|
return {"error": str(e)} |
|
|
|
|
|
def generate_image(prompt, path='test_image.png'): |
|
try: |
|
|
|
client = Client(constants.IMAGE_GENERATION_SPACE_NAME, hf_token=constants.HF_TOKEN) |
|
|
|
|
|
result = client.predict( |
|
prompt=prompt, |
|
width=1280, |
|
height=720, |
|
api_name="/generate_image" |
|
) |
|
|
|
image = Image.open(result) |
|
image.save(path) |
|
|
|
|
|
return result |
|
|
|
except Exception as e: |
|
print(f"Error during image generation: {e}") |
|
return {"error": str(e)} |
|
|
|
|
|
def generate_video(video_folder: str, output_filename: str = "final_video.mp4"): |
|
audio_folder = os.path.join(video_folder, "audio") |
|
images_folder = os.path.join(video_folder, "images") |
|
final_clips = [] |
|
|
|
|
|
scene_folders = [ |
|
os.path.join(images_folder, scene) |
|
for scene in natsorted(os.listdir(images_folder)) |
|
if os.path.isdir(os.path.join(images_folder, scene)) |
|
] |
|
|
|
for scene_path in scene_folders: |
|
scene_name = os.path.basename(scene_path) |
|
audio_path = os.path.join(audio_folder, f"{scene_name}.mp3") |
|
|
|
|
|
if not os.path.exists(audio_path): |
|
print(f"Warning: Audio file {audio_path} not found. Skipping scene {scene_name}.") |
|
continue |
|
|
|
|
|
image_files = natsorted([ |
|
os.path.join(scene_path, img) |
|
for img in os.listdir(scene_path) |
|
if img.lower().endswith(('.png', '.jpg', '.jpeg')) |
|
]) |
|
|
|
if not image_files: |
|
print(f"Warning: No images found in {scene_path}. Skipping scene {scene_name}.") |
|
continue |
|
|
|
|
|
audio_clip = mp.AudioFileClip(audio_path) |
|
|
|
|
|
duration_per_image = audio_clip.duration / len(image_files) |
|
|
|
|
|
image_clips = [mp.ImageClip(img).set_duration(duration_per_image) for img in image_files] |
|
|
|
|
|
scene_video = mp.concatenate_videoclips(image_clips, method="compose").set_audio(audio_clip) |
|
|
|
final_clips.append(scene_video) |
|
|
|
if not final_clips: |
|
print("Error: No valid scenes processed.") |
|
return None |
|
|
|
|
|
final_video = mp.concatenate_videoclips(final_clips, method="compose") |
|
output_path = os.path.join(video_folder, output_filename) |
|
final_video.write_videofile(output_path, fps=24, codec='libx264') |
|
|
|
return output_path |
|
|
|
|
|
def calculate_read_time(text: str, words_per_minute: int = 155) -> str: |
|
""" |
|
Calculate how long it will take to read a given text. |
|
|
|
Args: |
|
text (str): The input text to calculate reading time for. |
|
words_per_minute (int): Average reading speed in words per minute. Default is 155(an uneducated guess). |
|
|
|
Returns: |
|
str: A string describing the reading time in seconds, minutes, or hours. |
|
""" |
|
try: |
|
|
|
if not text or not isinstance(text, str): |
|
return "Invalid input: Text must be a non-empty string." |
|
|
|
|
|
words = text.split() |
|
word_count = len(words) |
|
|
|
|
|
total_seconds = (word_count / words_per_minute) * 60 |
|
|
|
|
|
hours = int(total_seconds // 3600) |
|
minutes = int((total_seconds % 3600) // 60) |
|
seconds = int(total_seconds % 60) |
|
|
|
|
|
if hours > 0: |
|
return f"Reading time: {hours} hour(s), {minutes} minute(s), and {seconds} second(s)." |
|
elif minutes > 0: |
|
return f"Reading time: {minutes} minute(s) and {seconds} second(s)." |
|
else: |
|
return f"Reading time: {seconds} second(s)." |
|
|
|
except Exception as e: |
|
return f"An error occurred: {e}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
short_story = """ |
|
In a quiet village, a young girl named Lily discovered a hidden garden. |
|
Every flower in the garden glowed with a magical light, revealing secrets of the past. |
|
Lily knew she had found something truly extraordinary. |
|
""" |
|
generate_audio(short_story, "Urdu", "Asad") |
|
|