Spaces:
Sleeping
Sleeping
| import asyncio, os, uuid, json, subprocess, random, re, textwrap, math | |
| import httpx, edge_tts, numpy as np, soundfile as sf | |
| from pathlib import Path | |
| from PIL import Image, ImageDraw, ImageFont | |
| from fastapi import FastAPI, BackgroundTasks, UploadFile, File | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pydantic import BaseModel | |
| from typing import List | |
| from urllib.parse import quote | |
| app = FastAPI() | |
| OR_KEYS = [os.environ.get(f"OR_KEY_{i}", "") for i in range(1, 4)] | |
| FREE_MODELS = ["openrouter/free", "openrouter/cypher-alpha:free", "tngtech/deepseek-r1t-chimera:free"] | |
| OUTPUT_DIR = Path("/tmp/shortsai") | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| jobs: dict = {} | |
| W9, H9 = 1080, 1920 # 9:16 | |
| W16, H16 = 1920, 1080 # 16:9 | |
| # βββ FONTS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_font(size=36, bold=False): | |
| paths = [ | |
| f"/usr/share/fonts/truetype/dejavu/DejaVuSans{'-Bold' if bold else ''}.ttf", | |
| f"/usr/share/fonts/truetype/liberation/LiberationSans{'-Bold' if bold else '-Regular'}.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", | |
| ] | |
| for p in paths: | |
| if Path(p).exists(): | |
| try: return ImageFont.truetype(p, size) | |
| except: pass | |
| return ImageFont.load_default() | |
| # βββ REQUEST ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class VideoRequest(BaseModel): | |
| template: str = "cinematic" # cinematic | iphone_chat | instagram_dm | reddit_story | quiz | |
| title: str = "" | |
| niche: str = "motivation" | |
| tone: str = "energetic" | |
| script_mode: str = "ai" | |
| script_text: str = "" | |
| topic: str = "" | |
| voice: str = "male" | |
| voice2: str = "female" # second voice for chat templates | |
| voice_speed: float = 1.0 | |
| duration: int = 60 | |
| aspect_ratio: str = "9:16" | |
| video_style: str = "fast_cuts" | |
| bg_music: str = "ambient" | |
| use_stock: bool = True | |
| session_id: str = "" | |
| # Chat template specific | |
| person1_name: str = "Alex" | |
| person2_name: str = "Jordan" | |
| # Reddit specific | |
| subreddit: str = "AITA" | |
| reddit_username: str = "throwaway_user" | |
| # Quiz specific | |
| num_questions: int = 5 | |
| # βββ OPENROUTER βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def call_openrouter(prompt: str, system: str = "") -> str: | |
| last = "No keys" | |
| for ki, key in enumerate(OR_KEYS): | |
| if not key.strip(): continue | |
| for model in FREE_MODELS: | |
| try: | |
| async with httpx.AsyncClient(timeout=90) as c: | |
| r = await c.post("https://openrouter.ai/api/v1/chat/completions", | |
| headers={"Authorization": f"Bearer {key.strip()}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "https://shortsai.hf.space", | |
| "X-Title": "ShortsAI"}, | |
| json={"model": model, | |
| "messages": [{"role":"system","content":system}, | |
| {"role":"user","content":prompt}], | |
| "max_tokens": 3000, "temperature": 0.85}) | |
| if r.status_code == 200: | |
| print(f"β Key{ki+1} {model}") | |
| return r.json()["choices"][0]["message"]["content"] | |
| last = f"Key{ki+1}/{model}: {r.status_code}" | |
| if r.status_code == 429: break | |
| except Exception as e: | |
| last = str(e) | |
| raise Exception(f"All keys failed: {last}") | |
| def clean_json(raw: str) -> str: | |
| raw = raw.strip() | |
| for fence in ["```json","```"]: | |
| if fence in raw: | |
| parts = raw.split(fence) | |
| raw = parts[1] if len(parts)>1 else parts[0] | |
| if "```" in raw: raw = raw.split("```")[0] | |
| break | |
| return raw.strip() | |
| # βββ TTS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| VOICES = { | |
| "male":"en-US-GuyNeural","female":"en-US-JennyNeural", | |
| "deep":"en-US-ChristopherNeural","energetic":"en-US-BrandonNeural", | |
| "soft":"en-US-AriaNeural","british":"en-GB-RyanNeural", | |
| "australian":"en-AU-WilliamNeural","indian":"en-IN-PrabhatNeural", | |
| "whispery":"en-US-AnaNeural","news":"en-US-SteffanNeural", | |
| } | |
| async def tts(text: str, voice: str, path: Path, speed: float = 1.0): | |
| v = VOICES.get(voice, "en-US-GuyNeural") | |
| rate = int((speed-1.0)*100) | |
| rate_s = f"+{rate}%" if rate >= 0 else f"{rate}%" | |
| await edge_tts.Communicate(text, v, rate=rate_s).save(str(path)) | |
| def audio_dur(path: Path) -> float: | |
| r = subprocess.run(["ffprobe","-v","quiet","-print_format","json", | |
| "-show_streams",str(path)], capture_output=True, text=True) | |
| try: | |
| for s in json.loads(r.stdout).get("streams",[]): | |
| if s.get("codec_type")=="audio": return float(s.get("duration",30)) | |
| except: pass | |
| return 30.0 | |
| # βββ BACKGROUND MUSIC βββββββββββββββββββββββββββββββββββββββββββββ | |
| def gen_music(dur: float, mood: str, path: Path, sr=44100): | |
| if mood=="none": return False | |
| n = int(sr*dur); t = np.linspace(0,dur,n,endpoint=False) | |
| a = np.zeros(n,dtype=np.float32) | |
| if mood=="ambient": | |
| lfo=0.5+0.5*np.sin(2*np.pi*0.08*t) | |
| a=(np.sin(2*np.pi*130.8*t)*0.28+np.sin(2*np.pi*196*t)*0.16+np.sin(2*np.pi*261.6*t)*0.10)*lfo | |
| elif mood=="dramatic": | |
| lfo=0.4+0.6*np.abs(np.sin(2*np.pi*0.06*t)) | |
| a=(np.sin(2*np.pi*55*t)*0.35+np.sin(2*np.pi*82.4*t)*0.22+np.sin(2*np.pi*58.3*t)*0.08)*lfo | |
| elif mood=="upbeat": | |
| pulse=(np.sin(2*np.pi*2.0*t)>0).astype(np.float32) | |
| a=(np.sin(2*np.pi*220*t)*0.22+np.sin(2*np.pi*330*t)*0.12)*(0.5+0.5*pulse)+np.sin(2*np.pi*110*t)*0.12 | |
| elif mood=="dark": | |
| lfo=0.3+0.7*np.abs(np.sin(2*np.pi*0.04*t)) | |
| a=(np.sin(2*np.pi*40*t)*0.30+np.sin(2*np.pi*60*t)*0.18+np.random.randn(n).astype(np.float32)*0.025)*lfo | |
| elif mood=="tense": | |
| tr=0.5+0.5*np.sin(2*np.pi*7*t) | |
| a=np.sin(2*np.pi*73.4*t)*0.22*tr+np.sin(2*np.pi*77.8*t)*0.08+np.sin(2*np.pi*146.8*t)*0.12 | |
| fade=min(int(sr*2),n//5) | |
| a[:fade]*=np.linspace(0,1,fade,dtype=np.float32) | |
| a[-fade:]*=np.linspace(1,0,fade,dtype=np.float32) | |
| pk=np.max(np.abs(a)) | |
| if pk>0: a=np.clip(a/pk*0.5,-1,1) | |
| sf.write(str(path),a,sr); return True | |
| # βββ IMAGE HELPERS ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| NICHE_STYLES = { | |
| "tech":"futuristic neon holographic UI, cyberpunk cityscape, blue cyan tones, cinematic 8K", | |
| "finance":"luxury Wall Street office, golden coins and charts, dramatic professional lighting", | |
| "horror":"dark eerie abandoned location, fog, horror film grain, ominous shadows", | |
| "motivation":"epic mountain peak silhouette, golden hour sunrise, vast dramatic sky", | |
| "facts":"colorful vibrant educational illustration, bold graphic design, clean modern style", | |
| "history":"vintage documentary sepia photograph, period-accurate historical setting", | |
| "science":"macro laboratory photography, glowing molecular structures, clean scientific lighting", | |
| "gaming":"epic cinematic game concept art, neon RGB, dramatic hero shot", | |
| "nature":"breathtaking wildlife photography, golden hour, National Geographic style", | |
| "food":"gourmet food close-up, steam rising, warm bokeh kitchen, Michelin star", | |
| "crypto":"bitcoin digital gold coins, blockchain visualization, neon dark background", | |
| "space":"NASA quality nebula and planets, deep space 8K, cosmic dramatic lighting", | |
| "psychology":"surreal dreamlike mindscape, symbolic lighting, conceptual art", | |
| "fitness":"athletic peak performance, cinematic gym lighting, raw energy", | |
| "travel":"breathtaking travel destination, golden hour, wide angle cinematic", | |
| "celebrity":"glamorous luxury lifestyle, magazine quality portrait", | |
| "drama":"cinematic emotional close-up, film noir shadows, dramatic lighting", | |
| "ragebait":"shocking dramatic scene, bold red tones, high contrast provocative imagery", | |
| "conspiracy":"dark evidence board, dim lighting, mysterious shadows", | |
| "comedy":"bright pop art comic book style, funny exaggerated, bold colors", | |
| "animals":"adorable wildlife close-up portrait, soft natural light", | |
| } | |
| NICHE_KW = {"tech":"technology futuristic","finance":"business money","horror":"dark eerie", | |
| "motivation":"motivation success","facts":"education knowledge","history":"history ancient", | |
| "science":"science laboratory","gaming":"gaming neon","nature":"nature landscape", | |
| "food":"food gourmet","crypto":"cryptocurrency bitcoin","space":"space galaxy", | |
| "psychology":"mind psychology","fitness":"fitness gym","travel":"travel adventure", | |
| "celebrity":"luxury glamour","drama":"cinematic dramatic","ragebait":"shocking dramatic", | |
| "conspiracy":"dark mysterious","comedy":"funny colorful","animals":"animals wildlife"} | |
| NICHE_COLOR = { | |
| "horror":"eq=saturation=0.35:contrast=1.45:brightness=-0.06", | |
| "motivation":"eq=saturation=1.6:contrast=1.1:brightness=0.05", | |
| "drama":"eq=saturation=0.75:contrast=1.35","conspiracy":"eq=saturation=0.25:contrast=1.55:brightness=-0.08", | |
| "ragebait":"eq=saturation=1.9:contrast=1.35","space":"eq=saturation=1.4:contrast=1.2:brightness=-0.04", | |
| "finance":"eq=saturation=1.15:contrast=1.1","tech":"eq=saturation=1.2:contrast=1.15", | |
| "nature":"eq=saturation=1.4:contrast=1.05","fitness":"eq=saturation=1.5:contrast=1.25", | |
| "food":"eq=saturation=1.5:contrast=1.1","default":"eq=saturation=1.1:contrast=1.05", | |
| } | |
| async def download_image(prompt: str, path: Path, W: int, H: int): | |
| url = (f"https://image.pollinations.ai/prompt/{quote(prompt)}" | |
| f"?width={W}&height={H}&nologo=true&enhance=true&seed={uuid.uuid4().int%99999}") | |
| for attempt in range(5): | |
| try: | |
| async with httpx.AsyncClient(timeout=120, follow_redirects=True) as c: | |
| r = await c.get(url) | |
| if r.status_code == 200: path.write_bytes(r.content); return | |
| elif r.status_code == 429: | |
| wait=(attempt+1)*12; print(f"Pollinations 429, wait {wait}s"); await asyncio.sleep(wait) | |
| else: raise Exception(f"Pollinations {r.status_code}") | |
| except httpx.TimeoutException: await asyncio.sleep(12) | |
| raise Exception("Pollinations failed 5 retries") | |
| async def fetch_stock(kw: str, path: Path, W: int, H: int) -> bool: | |
| try: | |
| async with httpx.AsyncClient(timeout=30, follow_redirects=True) as c: | |
| r = await c.get(f"https://source.unsplash.com/{W}x{H}/?{quote(kw)}") | |
| if r.status_code==200 and len(r.content)>5000: path.write_bytes(r.content); return True | |
| except: pass | |
| return False | |
| def create_srt(script, dur, path): | |
| words = script.split(); chunk = 6 | |
| chunks = [" ".join(words[i:i+chunk]) for i in range(0,len(words),chunk)] | |
| if not chunks: return | |
| cd = dur/len(chunks) | |
| def ft(t): | |
| h,m,s,ms=int(t//3600),int((t%3600)//60),int(t%60),int((t*1000)%1000) | |
| return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" | |
| path.write_text("\n".join(f"{i+1}\n{ft(i*cd)} --> {ft((i+1)*cd)}\n{c}\n" for i,c in enumerate(chunks))) | |
| def build_image_clip(img: Path, out: Path, W, H, dur, crop_idx, color_filter): | |
| ox,oy=[(0,0),(0.12,0),(0,0.12),(0.06,0.06),(0.12,0.12),(0.06,0)][crop_idx%6] | |
| sw,sh=int(W*1.15),int(H*1.15) | |
| vf=(f"scale={sw}:{sh}:force_original_aspect_ratio=increase," | |
| f"crop={W}:{H}:x={int(ox*W)}:y={int(oy*H)},{color_filter}") | |
| r=subprocess.run(["ffmpeg","-y","-loop","1","-i",str(img),"-t",str(dur), | |
| "-vf",vf,"-r","25","-c:v","libx264","-pix_fmt","yuv420p", | |
| "-preset","ultrafast",str(out)],capture_output=True) | |
| if r.returncode!=0: raise Exception(f"Clip: {r.stderr.decode()[-300:]}") | |
| def concat_clips(paths, out): | |
| valid=[p for p in paths if Path(p).exists() and Path(p).stat().st_size>500] | |
| if not valid: raise Exception("No valid clips to concat") | |
| if len(valid)<len(paths): print(f"β οΈ Skipping {len(paths)-len(valid)} missing clips") | |
| lf=out.parent/"list.txt" | |
| lf.write_text("\n".join(f"file '{p}'" for p in valid)) | |
| r=subprocess.run(["ffmpeg","-y","-f","concat","-safe","0","-i",str(lf),"-c","copy",str(out)],capture_output=True) | |
| if r.returncode!=0: | |
| r2=subprocess.run(["ffmpeg","-y","-f","concat","-safe","0","-i",str(lf), | |
| "-c:v","libx264","-c:a","aac","-preset","ultrafast",str(out)],capture_output=True) | |
| if r2.returncode!=0: raise Exception(f"Concat: {r2.stderr.decode()[-300:]}") | |
| def merge_audio(video, voice, music, srt, output, has_music): | |
| srt_esc=str(srt).replace("\\","/").replace(":","\\:") | |
| sub_filter=(f"subtitles={srt_esc}:force_style='" | |
| "FontName=DejaVu Sans Bold,FontSize=18,PrimaryColour=&H00FFFFFF," | |
| "OutlineColour=&H00000000,BackColour=&H80000000,Outline=2,Shadow=1," | |
| "Bold=1,Alignment=2,MarginV=50'") | |
| if has_music and music and music.exists(): | |
| cmd=["ffmpeg","-y","-i",str(video),"-i",str(voice),"-i",str(music), | |
| "-filter_complex","[1:a]volume=1.0[v];[2:a]volume=0.18[m];[v][m]amix=inputs=2:duration=first[aout]", | |
| "-map","0:v","-map","[aout]","-vf",sub_filter, | |
| "-c:v","libx264","-c:a","aac","-b:a","128k","-shortest","-preset","ultrafast",str(output)] | |
| else: | |
| cmd=["ffmpeg","-y","-i",str(video),"-i",str(voice),"-vf",sub_filter, | |
| "-c:v","libx264","-c:a","aac","-b:a","128k","-shortest","-preset","ultrafast",str(output)] | |
| r=subprocess.run(cmd,capture_output=True) | |
| if r.returncode!=0: | |
| cmd2=(cmd[:cmd.index("-vf")]+cmd[cmd.index("-vf")+2:]) if "-vf" in cmd else cmd | |
| cmd2=[x for x in cmd2 if x!=sub_filter] | |
| r2=subprocess.run(cmd2,capture_output=True) | |
| if r2.returncode!=0: raise Exception(f"Merge: {r2.stderr.decode()[-300:]}") | |
| def get_cut_times(dur, style): | |
| times=[0.0]; t=0.0 | |
| intervals={"fast_cuts":(0.4,0.9),"cinematic":(1.4,2.8),"documentary":(3.0,5.5)} | |
| lo,hi=intervals.get(style,(1.5,3.0)) | |
| while t<dur-0.3: | |
| t+=random.uniform(lo,hi) | |
| if t<dur: times.append(round(t,2)) | |
| return times | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TEMPLATE 1: CINEMATIC (existing enhanced) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def render_cinematic(job_id, req, job_dir, upd): | |
| W,H=(W9,H9) if req.aspect_ratio=="9:16" else (W16,H16) | |
| num_scenes=4 if req.duration<=30 else 6 | |
| img_style=NICHE_STYLES.get(req.niche,"cinematic photorealistic 8K") | |
| color_filter=NICHE_COLOR.get(req.niche,NICHE_COLOR["default"]) | |
| upd(5,"βοΈ AI writing viral script...") | |
| if req.script_mode=="ai": | |
| wps=2.3*req.voice_speed; tw=int(req.duration*wps) | |
| prompt=f"""Write a {req.duration}-second YouTube Shorts viral voiceover script. | |
| Topic: {req.topic} | Niche: {req.niche} | Tone: {req.tone} | Scenes: {num_scenes} | |
| MANDATORY: Script MUST be {tw} words (Β±10%). Count carefully. Fill the full duration. | |
| RULES: Pure spoken words only. No [Scene X], brackets, timestamps. Hook in first 3 words. | |
| Return ONLY JSON: {{"script":"..{tw} words..","scenes":[{{"description":"30-50 word ultra-detailed image prompt with lighting angle mood colors style","index":0}}]}}""" | |
| raw=clean_json(await call_openrouter(prompt,"Return ONLY valid JSON, no markdown.")) | |
| data=json.loads(raw) | |
| script=re.sub(r'\[[\d:]+\]\s*|Scene\s+\d+\s*:?|\[\s*Scene\s+\d+\s*\]','',data["script"],flags=re.I).strip() | |
| raw_scenes=data.get("scenes",[])[:num_scenes] | |
| else: | |
| script=req.script_text.strip() | |
| raw2=clean_json(await call_openrouter( | |
| f"Split into {num_scenes} visual scenes.\nScript:{script}\nReturn JSON array:[{{\"description\":\"30-50 word image prompt\",\"index\":0}}]", | |
| "Return ONLY valid JSON array.")) | |
| parsed=json.loads(raw2); raw_scenes=(parsed if isinstance(parsed,list) else parsed.get("scenes",[]))[:num_scenes] | |
| # Script length check + retry | |
| tw_target=int(req.duration*2.3*req.voice_speed) | |
| if len(script.split())<tw_target*0.5: | |
| upd(12,"β οΈ Script too short, AI retrying...") | |
| r2=clean_json(await call_openrouter( | |
| f"Write a {req.duration}s narration about '{req.topic or script[:80]}'. EXACTLY {tw_target} words. " | |
| f"Return JSON: {{\"script\":\"..words..\",\"scenes\":[{{\"description\":\"image prompt\",\"index\":0}},{{\"description\":\"image prompt\",\"index\":1}},{{\"description\":\"image prompt\",\"index\":2}},{{\"description\":\"image prompt\",\"index\":3}}]}}", | |
| "Return ONLY valid JSON.")) | |
| try: | |
| d2=json.loads(r2); s2=d2.get("script","").strip() | |
| if len(s2.split())>len(script.split()): script=s2; raw_scenes=d2.get("scenes",raw_scenes)[:num_scenes] | |
| except: pass | |
| upd(16,f"β Script ready ({len(script.split())} words). Getting images...") | |
| image_pool=[] | |
| # User uploads | |
| if req.session_id: | |
| ud=OUTPUT_DIR/req.session_id/"uploads" | |
| if ud.exists(): | |
| for ext in ["*.jpg","*.jpeg","*.png","*.webp"]: image_pool.extend(ud.glob(ext)) | |
| # AI images | |
| for i,scene in enumerate(raw_scenes): | |
| desc=scene.get("description",f"scene {i+1}") | |
| p=job_dir/f"ai_{i}.jpg" | |
| upd(16+int(i/len(raw_scenes)*22),f"π¨ AI image {i+1}/{len(raw_scenes)}...") | |
| await download_image(f"{desc}, {img_style}",p,W,H) | |
| image_pool.append(p) | |
| if i<len(raw_scenes)-1: await asyncio.sleep(3) | |
| # Stock | |
| if req.use_stock: | |
| kw=NICHE_KW.get(req.niche,req.niche); fetched=0 | |
| upd(38,f"πΈ Fetching stock images...") | |
| for i in range(min(6,max(2,10-len(image_pool)))): | |
| sp=job_dir/f"stock_{i}.jpg" | |
| if await fetch_stock([kw,req.niche,"cinematic"][i%3],sp,W,H): image_pool.append(sp); fetched+=1 | |
| await asyncio.sleep(1.5) | |
| if not image_pool: raise Exception("No images") | |
| user_imgs=[p for p in image_pool if "uploads" in str(p)] | |
| other_imgs=[p for p in image_pool if "uploads" not in str(p)] | |
| random.shuffle(other_imgs) | |
| image_pool=[]; ui=oi=slot=0 | |
| while oi<len(other_imgs) or ui<len(user_imgs): | |
| if user_imgs and ui<len(user_imgs) and slot%3==0: image_pool.append(user_imgs[ui]); ui+=1 | |
| elif oi<len(other_imgs): image_pool.append(other_imgs[oi]); oi+=1 | |
| elif ui<len(user_imgs): image_pool.append(user_imgs[ui]); ui+=1 | |
| slot+=1 | |
| upd(42,"ποΈ Generating voiceover...") | |
| vp=job_dir/"voice.mp3"; await tts(script,req.voice,vp,req.voice_speed) | |
| dur=audio_dur(vp) | |
| upd(52,"π΅ Generating background music...") | |
| mp=job_dir/"music.wav"; has_mus=gen_music(dur+3,req.bg_music,mp) | |
| upd(58,"π¬ Building clips...") | |
| cuts=get_cut_times(dur,req.video_style) | |
| clips=[] | |
| for i,ts in enumerate(cuts): | |
| te=cuts[i+1] if i+1<len(cuts) else dur | |
| cp=job_dir/f"clip_{i:04d}.mp4" | |
| build_image_clip(image_pool[i%len(image_pool)],cp,W,H,max(0.1,te-ts),i,color_filter) | |
| clips.append(cp) | |
| if i%10==0: upd(58+int(i/len(cuts)*25),f"π¬ {i+1}/{len(cuts)} clips...") | |
| upd(83,"π Stitching..."); cat=job_dir/"cat.mp4" | |
| if len(clips)==1: cat=clips[0] | |
| else: concat_clips(clips,cat) | |
| upd(90,"π§ Final mix..."); srt=job_dir/"caps.srt"; create_srt(script,dur,srt) | |
| out=job_dir/"final.mp4"; merge_audio(cat,vp,mp if has_mus else None,srt,out,has_mus) | |
| return out | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TEMPLATE 2: IPHONE CHAT / INSTAGRAM DM β complete rewrite | |
| # Key fixes: duration control, real IG colors, short messages | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def draw_ig_bubble(draw, img, text, y, W, is_sender, font): | |
| """Draw an Instagram-style bubble. Sender=purple gradient right, receiver=grey left.""" | |
| lines = textwrap.wrap(text, width=42) | |
| line_h = 36 | |
| pad_x, pad_y = 24, 14 | |
| tw = max((draw.textlength(l, font=font) for l in lines), default=60) | |
| bw = min(int(tw) + pad_x * 2, W - 100) | |
| bh = len(lines) * line_h + pad_y * 2 | |
| if is_sender: | |
| bx = W - bw - 24 | |
| # Instagram purple gradient (simulate with solid #8B5CF6 β #EC4899) | |
| # Draw two-tone approximation | |
| for px in range(bw): | |
| ratio = px / bw | |
| r = int(139 + (236-139)*ratio) | |
| g = int(92 + (72 -92 )*ratio) | |
| b = int(246 + (153-246)*ratio) | |
| draw.rectangle([bx+px, y, bx+px+1, y+bh], fill=(r,g,b)) | |
| # Rounded mask (draw white corners to simulate radius) | |
| draw.rounded_rectangle([bx, y, bx+bw, y+bh], radius=18, outline=(r,g,b), width=0) | |
| # Re-draw gradient properly via rounded_rectangle workaround | |
| # Use solid color that matches IG purple | |
| draw.rounded_rectangle([bx, y, bx+bw, y+bh], radius=18, fill=(168, 85, 247)) | |
| tx, ty = bx + pad_x, y + pad_y | |
| for line in lines: | |
| draw.text((tx, ty), line, font=font, fill=(255,255,255)) | |
| ty += line_h | |
| else: | |
| bx = 70 | |
| draw.rounded_rectangle([bx, y, bx+bw, y+bh], radius=18, fill=(38, 38, 38)) | |
| tx, ty = bx + pad_x, y + pad_y | |
| for line in lines: | |
| draw.text((tx, ty), line, font=font, fill=(255,255,255)) | |
| ty += line_h | |
| return bh + 12 | |
| def render_ig_frame(messages_so_far, W, H, p1_name, p2_name, is_iphone=False): | |
| """Render a full Instagram DM or iMessage frame.""" | |
| bg = (0, 0, 0) if not is_iphone else (18, 18, 18) | |
| img = Image.new("RGB", (W, H), bg) | |
| draw = ImageDraw.Draw(img) | |
| # ββ Status bar ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| draw.rectangle([0, 0, W, 44], fill=(0,0,0)) | |
| draw.text((40, 12), "9:41", font=get_font(26, True), fill=(255,255,255)) | |
| draw.text((W-120, 12), "βββ", font=get_font(22), fill=(255,255,255)) | |
| # ββ Header ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| hh = 110 | |
| draw.rectangle([0, 44, W, hh], fill=(0,0,0)) | |
| # Back chevron | |
| draw.text((20, 64), "βΉ", font=get_font(48, True), fill=(168,85,247)) | |
| if is_iphone: | |
| # iMessage: contact centered, green/grey bubbles | |
| draw.text((W//2 - 60, 64), p2_name, font=get_font(32, True), fill=(255,255,255)) | |
| else: | |
| # Instagram: username with @ and purple accent line at top | |
| draw.rectangle([0, 44, W, 47], fill=(168,85,247)) | |
| # Small avatar circle | |
| av_x, av_y = W//2 - 26, 52 | |
| draw.ellipse([av_x, av_y, av_x+52, av_y+52], fill=(80,40,120)) | |
| draw.text((av_x+14, av_y+10), p2_name[0].upper() if p2_name else "A", | |
| font=get_font(28, True), fill=(255,255,255)) | |
| draw.text((W//2 - len(p2_name)*9, 108), p2_name, | |
| font=get_font(28, True), fill=(255,255,255)) | |
| # ββ Thin separator βββββββββββββββββββββββββββββββββββββββββββββ | |
| draw.rectangle([0, hh, W, hh+1], fill=(30,30,30)) | |
| # ββ Messages β render from bottom up so newest at bottom ββββββ | |
| mf = get_font(30) | |
| msg_area_h = H - hh - 20 - 90 # space for messages | |
| # Calculate total height needed | |
| rendered = [] | |
| for msg in messages_so_far: | |
| lines = textwrap.wrap(msg["text"], width=42) | |
| bh = len(lines)*36 + 28 + 12 | |
| rendered.append((msg, bh)) | |
| # Scroll: only show messages that fit from bottom | |
| total_h = sum(h for _, h in rendered) | |
| skip_h = max(0, total_h - msg_area_h) | |
| y = hh + 20 | |
| cum = 0 | |
| for msg, bh in rendered: | |
| cum += bh | |
| if cum <= skip_h: | |
| continue | |
| is_sender = msg["sender"] == "p1" | |
| if is_iphone: | |
| # iMessage: blue sender, grey receiver | |
| lines = textwrap.wrap(msg["text"], width=42) | |
| pad_x, pad_y, line_h = 20, 12, 34 | |
| tw = max((draw.textlength(l, font=mf) for l in lines), default=60) | |
| bw = min(int(tw) + pad_x*2, W-100) | |
| bub_h = len(lines)*line_h + pad_y*2 | |
| bx = W-bw-20 if is_sender else 60 | |
| color = (0,122,255) if is_sender else (38,38,38) | |
| draw.rounded_rectangle([bx,y,bx+bw,y+bub_h], radius=18, fill=color) | |
| ty = y+pad_y | |
| for line in lines: | |
| draw.text((bx+pad_x,ty),line,font=mf,fill=(255,255,255)); ty+=line_h | |
| y += bub_h+12 | |
| else: | |
| y += draw_ig_bubble(draw, img, msg["text"], y, W, is_sender, mf) | |
| if y > H-100: | |
| break | |
| # ββ Input bar ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| bar_y = H - 80 | |
| draw.rectangle([0, bar_y, W, H], fill=(0,0,0)) | |
| draw.rectangle([0, bar_y, W, bar_y+1], fill=(30,30,30)) | |
| if not is_iphone: | |
| # Camera icon left | |
| draw.text((24, bar_y+20), "π·", font=get_font(36), fill=(168,85,247)) | |
| # Input box | |
| draw.rounded_rectangle([90, bar_y+14, W-24, H-14], radius=22, fill=(30,30,30)) | |
| draw.text((110, bar_y+22), "Message...", font=get_font(28), fill=(80,80,80)) | |
| # Heart emoji right | |
| draw.text((W-60, bar_y+20), "β‘", font=get_font(36), fill=(168,85,247)) | |
| else: | |
| draw.rounded_rectangle([60, bar_y+12, W-60, H-12], radius=22, | |
| fill=(30,30,30), outline=(60,60,60), width=1) | |
| draw.text((80, bar_y+20), "iMessage", font=get_font(28), fill=(80,80,80)) | |
| return img | |
| async def render_chat_video(job_id, req, job_dir, upd): | |
| W, H = W9, H9 | |
| is_iphone = req.template == "iphone_chat" | |
| is_ig = req.template == "instagram_dm" | |
| # ββ Duration-aware message count ββββββββββββββββββββββββββββββ | |
| # Each message = ~2.5 words/sec at normal speed, max 12 words | |
| # 30s β ~8 msgs, 60s β ~14 msgs | |
| target_dur = req.duration | |
| # Words per message: short texts = 6-12 words | |
| words_per_msg = 8 | |
| wps = 2.5 * req.voice_speed | |
| secs_per_msg = words_per_msg / wps # ~3.2s per message | |
| num_msgs = max(6, min(20, int(target_dur / secs_per_msg))) | |
| max_words_per_msg = 12 | |
| upd(5, f"βοΈ Writing {num_msgs}-message conversation...") | |
| prompt = f"""Write a {req.tone} conversation between {req.person1_name} and {req.person2_name}. | |
| Topic: {req.topic} | |
| STRICT RULES β this becomes a video: | |
| - Exactly {num_msgs} messages total, alternating p1/p2 | |
| - Each message: MAXIMUM {max_words_per_msg} words. Short texts only. Like real texting. | |
| - No long paragraphs. Keep every message punchy and short. | |
| - Make it emotional, dramatic, engaging β like viral TikTok chat videos | |
| - Escalate tension through the conversation | |
| Return ONLY a JSON array: | |
| [{{"sender":"p1","text":"short message max {max_words_per_msg} words"}},{{"sender":"p2","text":"reply"}}] | |
| Exactly {num_msgs} items. No markdown. No extra text.""" | |
| raw = clean_json(await call_openrouter(prompt, "Return ONLY valid JSON array. No markdown.")) | |
| messages = json.loads(raw) | |
| if not isinstance(messages, list): | |
| raise Exception("Chat parse failed β not a list") | |
| # Enforce word limit per message | |
| messages = messages[:num_msgs] | |
| for m in messages: | |
| words = m.get("text","").split() | |
| if len(words) > max_words_per_msg: | |
| m["text"] = " ".join(words[:max_words_per_msg]) | |
| upd(18, f"ποΈ Recording {len(messages)} voice lines...") | |
| audio_parts = [] | |
| for i, msg in enumerate(messages): | |
| v = req.voice if msg["sender"] == "p1" else req.voice2 | |
| p = job_dir / f"msg_{i:03d}.mp3" | |
| await tts(msg["text"], v, p, req.voice_speed) | |
| audio_parts.append((p, msg)) | |
| if i % 4 == 0: | |
| upd(18 + int(i/len(messages)*20), f"ποΈ Voice {i+1}/{len(messages)}...") | |
| # Check total duration | |
| total_audio = sum(audio_dur(ap) for ap, _ in audio_parts) | |
| print(f"π Total audio: {total_audio:.1f}s (target: {target_dur}s, {len(messages)} msgs)") | |
| upd(38, "π± Rendering Instagram DM frames...") | |
| frame_dir = job_dir/"frames"; frame_dir.mkdir(exist_ok=True) | |
| clip_paths = []; shown = [] | |
| for i, (ap, msg) in enumerate(audio_parts): | |
| shown.append(msg) | |
| clip_dur = round(audio_dur(ap), 3) | |
| # Render frame | |
| frame_img = render_ig_frame(shown, W, H, req.person1_name, req.person2_name, is_iphone) | |
| fp = frame_dir / f"frame_{i:03d}.png" | |
| frame_img.save(str(fp)) | |
| cp = job_dir / f"clip_{i:03d}.mp4" | |
| # ββ Convert audio to WAV for maximum FFmpeg compatibility ββββ | |
| wav_p = job_dir / f"msg_{i:03d}.wav" | |
| rw = subprocess.run([ | |
| "ffmpeg", "-y", "-i", str(ap), | |
| "-c:a", "pcm_s16le", "-ar", "24000", "-ac", "1", str(wav_p) | |
| ], capture_output=True) | |
| audio_src = wav_p if (wav_p.exists() and wav_p.stat().st_size > 100) else ap | |
| clip_ok = False | |
| # Method 1: image + audio β libx264 + aac (clean, no duplicate -t) | |
| r1 = subprocess.run([ | |
| "ffmpeg", "-y", | |
| "-loop", "1", "-i", str(fp), | |
| "-i", str(audio_src), | |
| "-map", "0:v", "-map", "1:a", | |
| "-c:v", "libx264", "-tune", "stillimage", | |
| "-c:a", "aac", "-b:a", "128k", | |
| "-pix_fmt", "yuv420p", "-r", "25", | |
| "-shortest", "-preset", "ultrafast", str(cp) | |
| ], capture_output=True) | |
| if cp.exists() and cp.stat().st_size > 1000: | |
| clip_ok = True | |
| else: | |
| print(f"β οΈ M1 err clip{i}: {r1.stderr.decode('utf-8',errors='replace')[-300:]}") | |
| if not clip_ok: | |
| # Method 2: two-step β silent video, then mux audio | |
| silent = job_dir / f"sil_{i:03d}.mp4" | |
| rs = subprocess.run([ | |
| "ffmpeg", "-y", "-loop", "1", "-t", str(clip_dur), "-i", str(fp), | |
| "-c:v", "libx264", "-tune", "stillimage", | |
| "-pix_fmt", "yuv420p", "-r", "25", "-an", | |
| "-preset", "ultrafast", str(silent) | |
| ], capture_output=True) | |
| if silent.exists() and silent.stat().st_size > 100: | |
| rm = subprocess.run([ | |
| "ffmpeg", "-y", "-i", str(silent), "-i", str(audio_src), | |
| "-c:v", "copy", "-c:a", "aac", "-b:a", "128k", | |
| "-map", "0:v", "-map", "1:a", "-shortest", str(cp) | |
| ], capture_output=True) | |
| if cp.exists() and cp.stat().st_size > 1000: | |
| clip_ok = True | |
| else: | |
| print(f"β οΈ M2b err: {rm.stderr.decode('utf-8',errors='replace')[-300:]}") | |
| else: | |
| print(f"β οΈ M2a err: {rs.stderr.decode('utf-8',errors='replace')[-300:]}") | |
| if not clip_ok: | |
| # Method 3: mpeg4 codec fallback (always in ffmpeg) | |
| r3 = subprocess.run([ | |
| "ffmpeg", "-y", | |
| "-loop", "1", "-i", str(fp), | |
| "-i", str(audio_src), | |
| "-map", "0:v", "-map", "1:a", | |
| "-c:v", "mpeg4", "-q:v", "5", | |
| "-c:a", "aac", "-b:a", "128k", | |
| "-pix_fmt", "yuv420p", "-r", "25", | |
| "-shortest", str(cp) | |
| ], capture_output=True) | |
| if cp.exists() and cp.stat().st_size > 1000: | |
| clip_ok = True | |
| else: | |
| print(f"β οΈ M3 err: {r3.stderr.decode('utf-8',errors='replace')[-300:]}") | |
| if clip_ok: | |
| clip_paths.append(cp) | |
| else: | |
| print(f"β οΈ Clip {i} failed all methods, skipping") | |
| upd(38 + int(i/len(messages)*38), f"π± Frame {i+1}/{len(messages)}...") | |
| if not clip_paths: | |
| raise Exception("No chat clips created. Check FFmpeg codec support.") | |
| upd(76, "π Stitching clips...") | |
| cat = job_dir/"cat.mp4" | |
| if len(clip_paths) == 1: | |
| cat = clip_paths[0] | |
| else: | |
| concat_clips(clip_paths, cat) | |
| upd(88, "π΅ Adding background music...") | |
| total_dur = sum(audio_dur(ap) for ap, _ in audio_parts) | |
| mp = job_dir/"music.wav"; has_mus = gen_music(total_dur + 3, req.bg_music, mp) | |
| out = job_dir/"final.mp4" | |
| if has_mus and mp.exists(): | |
| r = subprocess.run([ | |
| "ffmpeg", "-y", "-i", str(cat), "-i", str(mp), | |
| "-filter_complex", "[1:a]volume=0.15[m];[0:a][m]amix=inputs=2:duration=first[aout]", | |
| "-map", "0:v", "-map", "[aout]", | |
| "-c:v", "libx264", "-c:a", "aac", "-b:a", "128k", "-preset", "ultrafast", str(out) | |
| ], capture_output=True) | |
| if r.returncode != 0: | |
| subprocess.run(["ffmpeg","-y","-i",str(cat),"-c","copy",str(out)], capture_output=True) | |
| else: | |
| subprocess.run(["ffmpeg","-y","-i",str(cat),"-c","copy",str(out)], capture_output=True) | |
| return out | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TEMPLATE 3: REDDIT STORY | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def render_reddit_card(title, body_preview, username, sub, votes, W, H): | |
| img=Image.new("RGB",(W,H),(18,18,18)) | |
| draw=ImageDraw.Draw(img) | |
| # Reddit-style card | |
| cy=H//2-350; cw=W-60 | |
| draw.rounded_rectangle([30,cy,30+cw,cy+700],radius=16,fill=(26,26,26)) | |
| draw.rounded_rectangle([30,cy,30+cw,cy+702],radius=16,outline=(60,60,60),width=2) | |
| # Subreddit header | |
| draw.ellipse([52,cy+18,92,cy+58],fill=(255,69,0)) | |
| draw.text((60,cy+26),"r",font=get_font(26,True),fill=(255,255,255)) | |
| draw.text((100,cy+20),f"r/{sub}",font=get_font(28,True),fill=(255,255,255)) | |
| draw.text((100,cy+52),f"Posted by u/{username}",font=get_font(24),fill=(130,130,130)) | |
| # Awards | |
| draw.text((cw-80,cy+30),"πβ¨",font=get_font(28),fill=(255,215,0)) | |
| # Title | |
| tf=get_font(34,True); ty=cy+100 | |
| for line in textwrap.wrap(title,width=38): | |
| draw.text((52,ty),line,font=tf,fill=(215,215,215)); ty+=44 | |
| # Body preview | |
| bf=get_font(28); ty+=10 | |
| for line in textwrap.wrap(body_preview[:200],width=46): | |
| draw.text((52,ty),line,font=bf,fill=(160,160,160)); ty+=36 | |
| # Vote bar | |
| vy=cy+640 | |
| draw.rounded_rectangle([52,vy,52+80,vy+36],radius=18,fill=(255,69,0)) | |
| draw.text((62,vy+4),f"β {votes}",font=get_font(24,True),fill=(255,255,255)) | |
| draw.text((150,vy+8),"π¬ Comments",font=get_font(24),fill=(130,130,130)) | |
| draw.text((330,vy+8),"β¬ Share",font=get_font(24),fill=(130,130,130)) | |
| return img | |
| async def render_reddit_video(job_id, req, job_dir, upd): | |
| W,H=W9,H9 | |
| upd(5,"π AI writing Reddit story...") | |
| wps=2.0*req.voice_speed; tw=int(req.duration*wps) | |
| prompt=f"""Write a compelling Reddit story for r/{req.subreddit}. | |
| Topic: {req.topic} | Target words: {tw} | |
| Write in first-person Reddit style. Include drama, emotion, conflict. | |
| Return ONLY JSON: | |
| {{"title":"AITA/reddit post title (dramatic hook)","story":"full {tw} word story as narration","body_preview":"first 40 words of story","votes":"{random.randint(12,89)}K"}}""" | |
| raw=clean_json(await call_openrouter(prompt,"Return ONLY valid JSON.")) | |
| data=json.loads(raw) | |
| title=data.get("title","Reddit Story"); story=data.get("story","") | |
| body_preview=data.get("body_preview",story[:80]); votes=data.get("votes","42K") | |
| username=req.reddit_username or f"throwaway_{random.randint(1000,9999)}" | |
| upd(22,"π¨ Rendering Reddit card...") | |
| card=render_reddit_card(title,body_preview,username,req.subreddit,votes,W,H) | |
| card_path=job_dir/"reddit_card.png"; card.save(str(card_path)) | |
| upd(35,"ποΈ Generating narration...") | |
| vp=job_dir/"voice.mp3"; await tts(story,req.voice,vp,req.voice_speed) | |
| dur=audio_dur(vp) | |
| upd(48,"π΅ Music..."); mp=job_dir/"music.wav"; has_mus=gen_music(dur+3,req.bg_music,mp) | |
| upd(55,"π¬ Building video...") | |
| # Start with reddit card for 4s, then background with scrolling story | |
| card_clip=job_dir/"card_clip.mp4" | |
| subprocess.run(["ffmpeg","-y","-loop","1","-i",str(card_path),"-t","4", | |
| "-vf",f"scale={W}:{H},eq=saturation=1.1","-r","25", | |
| "-c:v","libx264","-pix_fmt","yuv420p","-preset","ultrafast",str(card_clip)], | |
| capture_output=True) | |
| # Rest of video: dark background with story text rolling (simulate teleprompter) | |
| words=story.split(); wps_actual=len(words)/max(dur-4,1) | |
| frame_dur=2.5; frames=[]; t=4.0 | |
| word_idx=0 | |
| while t<dur: | |
| chunk_words=words[word_idx:word_idx+18]; t+=frame_dur; word_idx=min(word_idx+18,len(words)) | |
| text=" ".join(chunk_words) | |
| fimg=Image.new("RGB",(W,H),(18,18,18)) | |
| fdraw=ImageDraw.Draw(fimg) | |
| # Subreddit watermark top | |
| fdraw.text((40,60),f"r/{req.subreddit}",font=get_font(32,True),fill=(255,69,0)) | |
| fdraw.text((40,100),f"u/{username}",font=get_font(26),fill=(100,100,100)) | |
| # Story text center | |
| ff=get_font(38); ty=H//2-200 | |
| for line in textwrap.wrap(text,width=36): | |
| fdraw.text((50,ty),line,font=ff,fill=(220,220,220)); ty+=52 | |
| fp=job_dir/f"story_frame_{len(frames):04d}.png"; fimg.save(str(fp)); frames.append((fp,frame_dur)) | |
| story_clips=[] | |
| for i,(fp,fd) in enumerate(frames): | |
| cp=job_dir/f"story_clip_{i:04d}.mp4" | |
| subprocess.run(["ffmpeg","-y","-loop","1","-i",str(fp),"-t",str(fd), | |
| "-vf",f"scale={W}:{H}","-r","25","-c:v","libx264", | |
| "-pix_fmt","yuv420p","-preset","ultrafast",str(cp)],capture_output=True) | |
| story_clips.append(cp) | |
| all_clips=[card_clip]+story_clips; cat=job_dir/"cat.mp4" | |
| if len(all_clips)==1: cat=all_clips[0] | |
| else: concat_clips(all_clips,cat) | |
| upd(85,"π§ Final mix..."); srt=job_dir/"caps.srt"; create_srt(story,dur,srt) | |
| out=job_dir/"final.mp4"; merge_audio(cat,vp,mp if has_mus else None,srt,out,has_mus) | |
| return out | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TEMPLATE 4: QUIZ / FACTS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def render_quiz_card(question, answer, reveal, W, H, niche): | |
| NICHE_COLORS={"tech":(0,150,255),"horror":(150,0,0),"motivation":(255,140,0), | |
| "space":(100,0,200),"finance":(0,180,100),"default":(255,69,0)} | |
| accent=NICHE_COLORS.get(niche,NICHE_COLORS["default"]) | |
| img=Image.new("RGB",(W,H),(12,12,15)) | |
| draw=ImageDraw.Draw(img) | |
| # Background gradient effect (simple top strip) | |
| for i in range(300): | |
| alpha=1-i/300; c=tuple(int(x*alpha) for x in accent) | |
| draw.line([(0,i),(W,i)],fill=c) | |
| # "?" or answer icon | |
| if not reveal: | |
| draw.ellipse([W//2-100,200,W//2+100,400],fill=accent) | |
| draw.text((W//2-40,250),"?",font=get_font(120,True),fill=(255,255,255)) | |
| else: | |
| draw.ellipse([W//2-100,200,W//2+100,400],fill=(0,200,100)) | |
| draw.text((W//2-50,250),"β",font=get_font(100,True),fill=(255,255,255)) | |
| # Question | |
| qf=get_font(44,True); qy=440 | |
| for line in textwrap.wrap(question,width=28): | |
| qw=draw.textlength(line,font=qf) | |
| draw.text(((W-qw)//2,qy),line,font=qf,fill=(255,255,255)); qy+=58 | |
| if reveal: | |
| # Answer box | |
| draw.rounded_rectangle([60,qy+40,W-60,qy+200],radius=20,fill=(0,40,0)) | |
| draw.rounded_rectangle([60,qy+40,W-60,qy+202],radius=20,outline=(0,200,100),width=3) | |
| draw.text((80,qy+60),"ANSWER:",font=get_font(28,True),fill=(0,200,100)) | |
| af=get_font(40,True); ay=qy+100 | |
| for line in textwrap.wrap(answer,width=30): | |
| draw.text((80,ay),line,font=af,fill=(255,255,255)); ay+=50 | |
| return img | |
| async def render_quiz_video(job_id, req, job_dir, upd): | |
| W,H=W9,H9 | |
| upd(5,"π§ AI generating quiz questions...") | |
| nq=req.num_questions | |
| prompt=f"""Generate {nq} fascinating quiz questions about: {req.topic} | |
| Niche: {req.niche}. Make answers surprising/shocking for viral appeal. | |
| Return ONLY JSON array: [{{"question":"...?","answer":"...","fun_fact":"one additional interesting sentence"}}]""" | |
| raw=clean_json(await call_openrouter(prompt,"Return ONLY valid JSON array.")) | |
| qas=json.loads(raw) | |
| if not isinstance(qas,list): raise Exception("Quiz parse failed") | |
| qas=qas[:nq] | |
| upd(20,"ποΈ Recording voiceovers...") | |
| clips=[]; total_dur=0 | |
| for i,qa in enumerate(qas): | |
| q=qa.get("question",""); a=qa.get("answer",""); ff=qa.get("fun_fact","") | |
| # Question TTS | |
| qp=job_dir/f"q_{i}.mp3"; await tts(f"Question {i+1}. {q}",req.voice,qp,req.voice_speed) | |
| qd=audio_dur(qp) | |
| # "Think about it..." pause = 2s silence | |
| # Answer TTS | |
| ap=job_dir/f"a_{i}.mp3"; await tts(f"The answer is... {a}. {ff}",req.voice,ap,req.voice_speed) | |
| ad=audio_dur(ap) | |
| # Render question frame | |
| qf=render_quiz_card(q,"",False,W,H,req.niche) | |
| qfp=job_dir/f"qf_{i}.png"; qf.save(str(qfp)) | |
| # Render answer frame | |
| af=render_quiz_card(q,a,True,W,H,req.niche) | |
| afp=job_dir/f"af_{i}.png"; af.save(str(afp)) | |
| # Build clips | |
| qcp=job_dir/f"qclip_{i:03d}.mp4" | |
| subprocess.run(["ffmpeg","-y","-loop","1","-i",str(qfp),"-i",str(qp), | |
| "-c:v","libx264","-c:a","aac","-pix_fmt","yuv420p", | |
| "-shortest","-preset","ultrafast",str(qcp)],capture_output=True) | |
| # 1.5s suspense pause clip | |
| scp=job_dir/f"sclip_{i:03d}.mp4" | |
| subprocess.run(["ffmpeg","-y","-loop","1","-i",str(qfp),"-t","1.5", | |
| "-vf",f"scale={W}:{H}","-r","25","-c:v","libx264", | |
| "-pix_fmt","yuv420p","-an","-preset","ultrafast",str(scp)],capture_output=True) | |
| # Answer clip | |
| acp=job_dir/f"aclip_{i:03d}.mp4" | |
| subprocess.run(["ffmpeg","-y","-loop","1","-i",str(afp),"-i",str(ap), | |
| "-c:v","libx264","-c:a","aac","-pix_fmt","yuv420p", | |
| "-shortest","-preset","ultrafast",str(acp)],capture_output=True) | |
| clips+=[qcp,scp,acp]; total_dur+=qd+1.5+ad | |
| upd(20+int(i/len(qas)*55),f"π§ Q{i+1}/{len(qas)} done...") | |
| upd(75,"π Stitching quiz..."); cat=job_dir/"cat.mp4" | |
| if len(clips)==1: cat=clips[0] | |
| else: concat_clips(clips,cat) | |
| upd(85,"π΅ Music...") | |
| mp=job_dir/"music.wav"; has_mus=gen_music(total_dur+3,req.bg_music,mp) | |
| out=job_dir/"final.mp4" | |
| if has_mus: | |
| r=subprocess.run(["ffmpeg","-y","-i",str(cat),"-i",str(mp), | |
| "-filter_complex","[1:a]volume=0.15[m];[0:a][m]amix=inputs=2:duration=first[aout]", | |
| "-map","0:v","-map","[aout]","-c:v","libx264","-c:a","aac","-b:a","128k", | |
| "-preset","ultrafast",str(out)],capture_output=True) | |
| if r.returncode!=0: subprocess.run(["ffmpeg","-y","-i",str(cat),"-c","copy",str(out)],capture_output=True) | |
| else: | |
| subprocess.run(["ffmpeg","-y","-i",str(cat),"-c","copy",str(out)],capture_output=True) | |
| return out | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN PIPELINE DISPATCHER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_pipeline(job_id: str, req: VideoRequest): | |
| job_dir=OUTPUT_DIR/job_id; job_dir.mkdir(parents=True,exist_ok=True) | |
| def upd(pct, msg): | |
| jobs[job_id].update({"progress":pct,"message":msg}) | |
| print(f"[{job_id[:8]}] {pct}% {msg}") | |
| try: | |
| jobs[job_id]["status"]="processing" | |
| t=req.template | |
| if t in ("iphone_chat","instagram_dm"): out=await render_chat_video(job_id,req,job_dir,upd) | |
| elif t=="reddit_story": out=await render_reddit_video(job_id,req,job_dir,upd) | |
| elif t=="quiz": out=await render_quiz_video(job_id,req,job_dir,upd) | |
| else: out=await render_cinematic(job_id,req,job_dir,upd) | |
| jobs[job_id].update({"status":"done","video_path":str(out)}) | |
| upd(100,"β Video ready!") | |
| except Exception as e: | |
| jobs[job_id].update({"status":"error","message":f"β {e}"}) | |
| print(f"[{job_id[:8]}] ERROR: {e}") | |
| # βββ ROUTES βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def home(): return FileResponse("static/index.html") | |
| async def dashboard(): return FileResponse("static/dashboard.html") | |
| async def upload_images(session_id: str, files: List[UploadFile]=File(...)): | |
| d=OUTPUT_DIR/session_id/"uploads"; d.mkdir(parents=True,exist_ok=True) | |
| saved=[] | |
| for f in files: | |
| if f.content_type not in ["image/jpeg","image/png","image/webp","image/jpg"]: continue | |
| p=d/f"{uuid.uuid4().hex[:8]}_{f.filename}"; p.write_bytes(await f.read()); saved.append(f.filename) | |
| return {"uploaded":len(saved),"session_id":session_id} | |
| async def generate(req: VideoRequest, bg: BackgroundTasks): | |
| jid=str(uuid.uuid4()) | |
| jobs[jid]={"status":"queued","progress":0,"message":"β³ Starting...","video_path":None} | |
| bg.add_task(run_pipeline,jid,req) | |
| return {"job_id":jid} | |
| async def status(job_id: str): | |
| if job_id not in jobs: return JSONResponse({"error":"Not found"},status_code=404) | |
| return jobs[job_id] | |
| async def download(job_id: str): | |
| j=jobs.get(job_id) | |
| if not j or j["status"]!="done": return JSONResponse({"error":"Not ready"},status_code=400) | |
| return FileResponse(j["video_path"],media_type="video/mp4",filename="shortsai_video.mp4") | |
| app.mount("/static",StaticFiles(directory="static"),name="static") | |