|
|
import torch |
|
|
import numpy as np |
|
|
import subprocess |
|
|
from yt_dlp import YoutubeDL |
|
|
from sentence_transformers import SentenceTransformer, util |
|
|
from TTS.api import TTS |
|
|
import moviepy.editor as mp |
|
|
import gradio as gr |
|
|
import cv2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
youtube_url = "https://www.youtube.com/@TheGodHouseCentre" |
|
|
|
|
|
|
|
|
tts = TTS(model_name="tts_models/en/vctk/vits", progress_bar=False, gpu=torch.cuda.is_available()) |
|
|
|
|
|
|
|
|
text_model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
|
|
|
|
|
|
dataset = [ |
|
|
{"question": "Hello", "answer": "Hi there! How can I help you?", "video_url": youtube_url} |
|
|
] |
|
|
|
|
|
|
|
|
questions = [item["question"] for item in dataset] |
|
|
question_embeddings = text_model.encode(questions, convert_to_tensor=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def retrieve_answer(query): |
|
|
query_emb = text_model.encode(query, convert_to_tensor=True) |
|
|
cos_scores = util.cos_sim(query_emb, question_embeddings)[0] |
|
|
best_idx = torch.argmax(cos_scores) |
|
|
return dataset[best_idx] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_audio(text): |
|
|
audio_path = "/tmp/temp_audio.wav" |
|
|
tts.tts_to_file(text=text, file_path=audio_path) |
|
|
return audio_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def stream_youtube_frame(url): |
|
|
ydl_opts = {'format': 'best', 'quiet': True, 'noplaylist': True} |
|
|
with YoutubeDL(ydl_opts) as ydl: |
|
|
info = ydl.extract_info(url, download=False) |
|
|
stream_url = info['url'] |
|
|
|
|
|
width, height = 640, 360 |
|
|
frame_size = width * height * 3 |
|
|
process = subprocess.Popen( |
|
|
['ffmpeg', '-i', stream_url, '-f', 'image2pipe', '-pix_fmt', 'rgb24', '-vcodec', 'rawvideo', '-'], |
|
|
stdout=subprocess.PIPE |
|
|
) |
|
|
raw_frame = process.stdout.read(frame_size) |
|
|
frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3)) |
|
|
return frame |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_video_with_audio(source_frame, driving_audio): |
|
|
tmp_frame = "/tmp/temp_frame.jpg" |
|
|
tmp_output = "/tmp/output_video.mp4" |
|
|
cv2.imwrite(tmp_frame, cv2.cvtColor(source_frame, cv2.COLOR_RGB2BGR)) |
|
|
clip = mp.ImageClip(tmp_frame).set_duration(5) |
|
|
clip = clip.set_audio(mp.AudioFileClip(driving_audio)) |
|
|
clip.write_videofile(tmp_output, fps=24, verbose=False, logger=None) |
|
|
return tmp_output |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def answer_question(query): |
|
|
row = retrieve_answer(query) |
|
|
audio_file = generate_audio(row["answer"]) |
|
|
frame = stream_youtube_frame(row["video_url"]) |
|
|
video_file = generate_video_with_audio(frame, audio_file) |
|
|
return video_file |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
iface = gr.Interface( |
|
|
fn=answer_question, |
|
|
inputs=gr.Textbox(label="Ask a question"), |
|
|
outputs=gr.Video(label="Generated Video"), |
|
|
title="AI Video/Audio Generator", |
|
|
description="Ask a question and get a video+audio response generated from YouTube frames with TTS." |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
iface.launch(server_name="0.0.0.0", server_port=7860) |
|
|
|