|
import gradio as gr |
|
import asyncio |
|
import aiohttp |
|
import os |
|
import shutil |
|
import time |
|
import random |
|
import json |
|
from PIL import Image |
|
from aiohttp_socks import ProxyConnector |
|
from typing import List, Dict, Optional, Tuple |
|
|
|
|
|
|
|
TEXT_API_URL = "https://text.pollinations.ai/openai" |
|
IMAGE_API_URL = "https://image.pollinations.ai/prompt" |
|
|
|
|
|
API_KEYS_FILE = "api_keys.txt" |
|
TEMP_DIR = "temp_frames" |
|
OUTPUT_DIR = "output_gifs" |
|
ARCHIVE_DIR = "archives" |
|
|
|
|
|
IMAGE_MODELS_IMG2IMG = ["kontext", "gptimage", "flux"] |
|
|
|
|
|
|
|
class APIKeyRotator: |
|
"""Управляет загрузкой, ротацией и отслеживанием использования API ключей.""" |
|
def __init__(self, filepath: str): |
|
self.filepath = filepath |
|
|
|
self.keys: List[str] = self._load_keys_from_env_or_file() |
|
self.current_index: int = 0 |
|
self.usage_count: Dict[str, int] = {key: 0 for key in self.keys} |
|
|
|
def _load_keys_from_env_or_file(self) -> List[str]: |
|
"""Загружает ключи из переменных окружения (секреты HF) или из файла.""" |
|
|
|
hf_secrets = os.environ.get("POLLINATIONS_API_KEYS") |
|
if hf_secrets: |
|
print("Загрузка API ключей из Секретов Hugging Face.") |
|
return [key.strip() for key in hf_secrets.split(',') if key.strip()] |
|
|
|
|
|
print("Секреты HF не найдены. Попытка загрузки из api_keys.txt.") |
|
try: |
|
with open(self.filepath, 'r', encoding='utf-8') as f: |
|
return [line.strip() for line in f.readlines() if line.strip()] |
|
except FileNotFoundError: |
|
return [] |
|
|
|
def get_next_key(self) -> Optional[str]: |
|
if not self.keys: return None |
|
key = self.keys[self.current_index] |
|
self.usage_count[key] += 1 |
|
self.current_index = (self.current_index + 1) % len(self.keys) |
|
return key |
|
|
|
def get_status(self) -> str: |
|
if not self.keys: |
|
return "🔑 **Статус**: API ключи для генерации изображений не найдены. Добавьте их в Секреты Hugging Face или в файл `api_keys.txt`." |
|
status_lines = [f"🔑 **Загружено ключей (для изображений)**: {len(self.keys)}"] |
|
for i, key in enumerate(self.keys): |
|
masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else key |
|
usage = self.usage_count.get(key, 0) |
|
is_next = " ⬅️" if i == self.current_index else "" |
|
status_lines.append(f" - `{masked_key}` (Исп: {usage}){is_next}") |
|
return "\n".join(status_lines) |
|
|
|
def refresh(self): |
|
self.__init__(self.filepath) |
|
|
|
|
|
|
|
key_rotator = APIKeyRotator(API_KEYS_FILE) |
|
|
|
|
|
|
|
async def _call_director_api(session: aiohttp.ClientSession, prompt: str, seed: int) -> Tuple[Optional[List[str]], str]: |
|
"""Запрашивает сценарий у модели 'openai-fast'. Ключ не требуется.""" |
|
payload = { |
|
"model": "openai-fast", "response_format": {"type": "json_object"}, |
|
"messages": [ |
|
{"role": "system", "content": "You are a scriptwriter for a short animated GIF..."}, |
|
{"role": "user", "content": prompt} |
|
], "seed": seed |
|
} |
|
try: |
|
async with session.post(TEXT_API_URL, json=payload, headers={}, timeout=120) as response: |
|
if response.status == 200: |
|
response_text = await response.text() |
|
prompts_str = json.loads(response_text).get('choices', [{}])[0].get('message', {}).get('content') |
|
prompts = json.loads(prompts_str).get("prompts", []) |
|
return prompts, f"✅ **Режиссёр (`openai-fast`)**: Сценарий на {len(prompts)} кадров получен." |
|
return None, f"❌ **Режиссёр**: Ошибка API (Статус: {response.status})." |
|
except Exception as e: |
|
return None, f"❌ **Режиссёр**: Ошибка. {e}" |
|
|
|
|
|
async def _call_artist_api(session: aiohttp.ClientSession, params: Dict, output_path: str, previous_image_path: Optional[str] = None) -> bool: |
|
"""Генерирует один кадр. Требует API-ключ.""" |
|
max_retries = 3 |
|
for attempt in range(max_retries): |
|
headers = {} |
|
api_key = key_rotator.get_next_key() |
|
if api_key: headers["Authorization"] = f"Bearer {api_key}" |
|
|
|
form = aiohttp.FormData() |
|
params["seed"] = params.get("seed", 0) + attempt * 10 |
|
for key, value in params.items(): form.add_field(key, str(value)) |
|
|
|
if previous_image_path and os.path.exists(previous_image_path): |
|
form.add_field('image', open(previous_image_path, 'rb')) |
|
|
|
try: |
|
async with session.post(IMAGE_API_URL, data=form, headers=headers, timeout=240) as response: |
|
if response.status == 200: |
|
with open(output_path, "wb") as f: f.write(await response.read()) |
|
return True |
|
except Exception as e: |
|
if attempt == max_retries - 1: print(f"Artist API System Error: {e}") |
|
await asyncio.sleep(2) |
|
return False |
|
|
|
|
|
def _compile_gif_and_archive(image_files: List[str], fps: int, timestamp: str) -> str: |
|
"""Собирает GIF, архивирует сессию и возвращает финальный путь к GIF.""" |
|
images = [Image.open(f) for f in image_files] |
|
temp_gif_path = os.path.join(OUTPUT_DIR, f"animation_{timestamp}.gif") |
|
images[0].save(temp_gif_path, save_all=True, append_images=images[1:], duration=1000 // fps, loop=0) |
|
archive_path = os.path.join(ARCHIVE_DIR, timestamp) |
|
frames_archive_path = os.path.join(archive_path, "frames") |
|
shutil.move(os.path.dirname(image_files[0]), frames_archive_path) |
|
final_gif_path = os.path.join(archive_path, os.path.basename(temp_gif_path)) |
|
shutil.move(temp_gif_path, final_gif_path) |
|
return final_gif_path |
|
|
|
|
|
|
|
async def generate_gif_flow( |
|
proxy: str, main_idea: str, fps: int, duration: int, width: int, height: int, |
|
artist_model: str, seed_val: int, delay: int, progress: gr.Progress |
|
) -> Tuple[Optional[str], str]: |
|
status_log = ["🚀 **Старт**: Инициализация..."] |
|
yield "\n".join(status_log) |
|
if not main_idea: raise gr.Error("Пожалуйста, введите основную идею.") |
|
|
|
timestamp = time.strftime("%Y%m%d-%H%M%S") |
|
session_temp_dir = os.path.join(TEMP_DIR, timestamp) |
|
for dir_path in [session_temp_dir, OUTPUT_DIR, ARCHIVE_DIR]: os.makedirs(dir_path, exist_ok=True) |
|
|
|
total_frames = int(fps * duration) |
|
seed = int(seed_val) if seed_val != 0 else random.randint(1, 1000000) |
|
status_log.append(f"🌱 **Параметры**: Seed: `{seed}`, Кадров: `{total_frames}`.") |
|
yield "\n".join(status_log) |
|
|
|
connector = ProxyConnector.from_url(proxy) if proxy else None |
|
async with aiohttp.ClientSession(connector=connector) as session: |
|
director_prompt = f"Based on the core idea '{main_idea}', generate {total_frames} prompts..." |
|
prompts, msg = await _call_director_api(session, director_prompt, seed) |
|
status_log.append(msg); yield "\n".join(status_log) |
|
if not prompts: return None, "\n".join(status_log) |
|
|
|
generated_files, previous_frame_path = [], None |
|
for i in progress.tqdm(range(total_frames), desc="🎨 Генерация кадров"): |
|
header = f"🎬 **Кадр {i + 1}/{total_frames}**" |
|
frame_path = os.path.join(session_temp_dir, f"frame_{i:04d}.png") |
|
params = {"width": width, "height": height, "seed": seed + i, "nologo": "true", "prompt": prompts[i % len(prompts)]} |
|
|
|
params["model"] = "flux" if i == 0 else artist_model |
|
status_log.append(f"{header}: Режим `{'text2img' if i==0 else 'img2img'}` (модель `{params['model']}`).") |
|
yield "\n".join(status_log) |
|
|
|
if await _call_artist_api(session, params, frame_path, previous_frame_path): |
|
status_log.append(f"{header}: ✅ Успешно.") |
|
generated_files.append(frame_path) |
|
previous_frame_path = frame_path |
|
else: |
|
status_log.append(f"❌ **СТОП**: {header}: Не удалось сгенерировать."); yield "\n".join(status_log) |
|
break |
|
|
|
yield "\n".join(status_log) |
|
if i < total_frames - 1 and delay > 0: await asyncio.sleep(delay) |
|
|
|
if not generated_files: |
|
status_log.append("😭 **Результат**: Не создано ни одного кадра.") |
|
return None, "\n".join(status_log) |
|
|
|
status_log.append("🎞️ **Сборка**: Создание GIF и архивация...") |
|
yield "\n".join(status_log) |
|
final_gif_path = _compile_gif_and_archive(generated_files, fps, timestamp) |
|
status_log.append(f"✅ **Готово**: GIF сохранена в `{final_gif_path}`.") |
|
return final_gif_path, "\n".join(status_log) |
|
|
|
|
|
|
|
def create_ui(): |
|
css = "h1 {text-align: center;} footer {display: none !important;}" |
|
with gr.Blocks(css=css, title="🎬 GIF Animator Pro", theme=gr.themes.Soft()) as demo: |
|
gr.HTML("""<div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #89f7fe 0%, #66a6ff 100%); border-radius: 10px; margin-bottom: 20px;"><h1 style="color: white; margin: 0; font-size: 2.8em; text-shadow: 2px 2px 4px #00000040;">🎬 GIF Animator Pro</h1></div>""") |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
main_idea = gr.Textbox(label="💡 1. Главная идея анимации", lines=3) |
|
with gr.Accordion("🤖 2. Выбор модели художника", open=True): |
|
artist_model = gr.Dropdown(IMAGE_MODELS_IMG2IMG, label="Модель-художник", value="kontext") |
|
with gr.Accordion("⚙️ 3. Параметры анимации", open=True): |
|
fps = gr.Slider(5, 30, value=10, step=1, label="Кадров/сек") |
|
duration = gr.Slider(1, 10, value=2, step=1, label="Длительность (сек)") |
|
width = gr.Slider(256, 1024, value=512, step=64, label="Ширина") |
|
height = gr.Slider(256, 1024, value=512, step=64, label="Высота") |
|
with gr.Accordion("🔧 4. Дополнительные настройки", open=False): |
|
seed = gr.Number(label="Seed (0 = случайный)", value=0) |
|
delay = gr.Slider(0, 30, value=2, step=1, label="Пауза между кадрами") |
|
proxy = gr.Textbox(label="SOCKS5 Прокси (опционально)") |
|
generate_btn = gr.Button("✨ Сгенерировать GIF!", variant="primary", size="lg") |
|
with gr.Column(scale=3): |
|
output_gif = gr.Image(label="🎞️ Результат", interactive=False, height=400) |
|
with gr.Tabs(): |
|
with gr.TabItem("📋 Журнал событий"): status_box = gr.Markdown() |
|
with gr.TabItem("🔑 Статус API ключей"): |
|
keys_status = gr.Markdown(value=key_rotator.get_status) |
|
refresh_btn = gr.Button("🔄 Обновить ключи") |
|
|
|
def on_generate_start(): return {generate_btn: gr.update(value="⏳ Генерация...", interactive=False), status_box: None, output_gif: None} |
|
def on_generate_finish(): return {generate_btn: gr.update(value="✨ Сгенерировать!", interactive=True)} |
|
|
|
generate_btn.click(on_generate_start, outputs=[generate_btn, status_box, output_gif]).then( |
|
fn=generate_gif_flow, |
|
inputs=[proxy, main_idea, fps, duration, width, height, artist_model, seed, delay], |
|
outputs=[output_gif, status_box] |
|
).then(on_generate_finish, outputs=[generate_btn]) |
|
|
|
refresh_btn.click(lambda: (key_rotator.refresh(), key_rotator.get_status())[1], outputs=[keys_status]) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
ui = create_ui() |
|
|
|
ui.launch() |