Rooni's picture
Update app.py
37785ec verified
import gradio as gr
import asyncio
import aiohttp
import os
import shutil
import time
import random
import json
from PIL import Image
from aiohttp_socks import ProxyConnector
from typing import List, Dict, Optional, Tuple
# --- КОНФИГУРАЦИЯ ---
# URL адреса API
TEXT_API_URL = "https://text.pollinations.ai/openai"
IMAGE_API_URL = "https://image.pollinations.ai/prompt"
# Файлы и директории
API_KEYS_FILE = "api_keys.txt" # Ключи только для генерации ИЗОБРАЖЕНИЙ
TEMP_DIR = "temp_frames"
OUTPUT_DIR = "output_gifs"
ARCHIVE_DIR = "archives"
# Модели для генерации изображений
IMAGE_MODELS_IMG2IMG = ["kontext", "gptimage", "flux"]
# --- КЛАСС ДЛЯ РОТАЦИИ API КЛЮЧЕЙ (ДЛЯ ХУДОЖНИКА) ---
class APIKeyRotator:
"""Управляет загрузкой, ротацией и отслеживанием использования API ключей."""
def __init__(self, filepath: str):
self.filepath = filepath
# Пытаемся загрузить ключи из Секретов Hugging Face, если не получается - из файла
self.keys: List[str] = self._load_keys_from_env_or_file()
self.current_index: int = 0
self.usage_count: Dict[str, int] = {key: 0 for key in self.keys}
def _load_keys_from_env_or_file(self) -> List[str]:
"""Загружает ключи из переменных окружения (секреты HF) или из файла."""
# Рекомендованный способ: Секреты Hugging Face
hf_secrets = os.environ.get("POLLINATIONS_API_KEYS")
if hf_secrets:
print("Загрузка API ключей из Секретов Hugging Face.")
return [key.strip() for key in hf_secrets.split(',') if key.strip()]
# Запасной вариант: Файл api_keys.txt
print("Секреты HF не найдены. Попытка загрузки из api_keys.txt.")
try:
with open(self.filepath, 'r', encoding='utf-8') as f:
return [line.strip() for line in f.readlines() if line.strip()]
except FileNotFoundError:
return []
def get_next_key(self) -> Optional[str]:
if not self.keys: return None
key = self.keys[self.current_index]
self.usage_count[key] += 1
self.current_index = (self.current_index + 1) % len(self.keys)
return key
def get_status(self) -> str:
if not self.keys:
return "🔑 **Статус**: API ключи для генерации изображений не найдены. Добавьте их в Секреты Hugging Face или в файл `api_keys.txt`."
status_lines = [f"🔑 **Загружено ключей (для изображений)**: {len(self.keys)}"]
for i, key in enumerate(self.keys):
masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else key
usage = self.usage_count.get(key, 0)
is_next = " ⬅️" if i == self.current_index else ""
status_lines.append(f" - `{masked_key}` (Исп: {usage}){is_next}")
return "\n".join(status_lines)
def refresh(self):
self.__init__(self.filepath)
# --- ГЛОБАЛЬНЫЕ ОБЪЕКТЫ ---
key_rotator = APIKeyRotator(API_KEYS_FILE)
# --- АСИНХРОННЫЕ HELPER-ФУНКЦИИ ---
async def _call_director_api(session: aiohttp.ClientSession, prompt: str, seed: int) -> Tuple[Optional[List[str]], str]:
"""Запрашивает сценарий у модели 'openai-fast'. Ключ не требуется."""
payload = {
"model": "openai-fast", "response_format": {"type": "json_object"},
"messages": [
{"role": "system", "content": "You are a scriptwriter for a short animated GIF..."},
{"role": "user", "content": prompt}
], "seed": seed
}
try:
async with session.post(TEXT_API_URL, json=payload, headers={}, timeout=120) as response:
if response.status == 200:
response_text = await response.text()
prompts_str = json.loads(response_text).get('choices', [{}])[0].get('message', {}).get('content')
prompts = json.loads(prompts_str).get("prompts", [])
return prompts, f"✅ **Режиссёр (`openai-fast`)**: Сценарий на {len(prompts)} кадров получен."
return None, f"❌ **Режиссёр**: Ошибка API (Статус: {response.status})."
except Exception as e:
return None, f"❌ **Режиссёр**: Ошибка. {e}"
async def _call_artist_api(session: aiohttp.ClientSession, params: Dict, output_path: str, previous_image_path: Optional[str] = None) -> bool:
"""Генерирует один кадр. Требует API-ключ."""
max_retries = 3
for attempt in range(max_retries):
headers = {}
api_key = key_rotator.get_next_key()
if api_key: headers["Authorization"] = f"Bearer {api_key}"
form = aiohttp.FormData()
params["seed"] = params.get("seed", 0) + attempt * 10
for key, value in params.items(): form.add_field(key, str(value))
if previous_image_path and os.path.exists(previous_image_path):
form.add_field('image', open(previous_image_path, 'rb'))
try:
async with session.post(IMAGE_API_URL, data=form, headers=headers, timeout=240) as response:
if response.status == 200:
with open(output_path, "wb") as f: f.write(await response.read())
return True
except Exception as e:
if attempt == max_retries - 1: print(f"Artist API System Error: {e}")
await asyncio.sleep(2)
return False
def _compile_gif_and_archive(image_files: List[str], fps: int, timestamp: str) -> str:
"""Собирает GIF, архивирует сессию и возвращает финальный путь к GIF."""
images = [Image.open(f) for f in image_files]
temp_gif_path = os.path.join(OUTPUT_DIR, f"animation_{timestamp}.gif")
images[0].save(temp_gif_path, save_all=True, append_images=images[1:], duration=1000 // fps, loop=0)
archive_path = os.path.join(ARCHIVE_DIR, timestamp)
frames_archive_path = os.path.join(archive_path, "frames")
shutil.move(os.path.dirname(image_files[0]), frames_archive_path)
final_gif_path = os.path.join(archive_path, os.path.basename(temp_gif_path))
shutil.move(temp_gif_path, final_gif_path)
return final_gif_path
# --- ОСНОВНАЯ ЛОГИКА ГЕНЕРАЦИИ ---
async def generate_gif_flow(
proxy: str, main_idea: str, fps: int, duration: int, width: int, height: int,
artist_model: str, seed_val: int, delay: int, progress: gr.Progress
) -> Tuple[Optional[str], str]:
status_log = ["🚀 **Старт**: Инициализация..."]
yield "\n".join(status_log)
if not main_idea: raise gr.Error("Пожалуйста, введите основную идею.")
timestamp = time.strftime("%Y%m%d-%H%M%S")
session_temp_dir = os.path.join(TEMP_DIR, timestamp)
for dir_path in [session_temp_dir, OUTPUT_DIR, ARCHIVE_DIR]: os.makedirs(dir_path, exist_ok=True)
total_frames = int(fps * duration)
seed = int(seed_val) if seed_val != 0 else random.randint(1, 1000000)
status_log.append(f"🌱 **Параметры**: Seed: `{seed}`, Кадров: `{total_frames}`.")
yield "\n".join(status_log)
connector = ProxyConnector.from_url(proxy) if proxy else None
async with aiohttp.ClientSession(connector=connector) as session:
director_prompt = f"Based on the core idea '{main_idea}', generate {total_frames} prompts..."
prompts, msg = await _call_director_api(session, director_prompt, seed)
status_log.append(msg); yield "\n".join(status_log)
if not prompts: return None, "\n".join(status_log)
generated_files, previous_frame_path = [], None
for i in progress.tqdm(range(total_frames), desc="🎨 Генерация кадров"):
header = f"🎬 **Кадр {i + 1}/{total_frames}**"
frame_path = os.path.join(session_temp_dir, f"frame_{i:04d}.png")
params = {"width": width, "height": height, "seed": seed + i, "nologo": "true", "prompt": prompts[i % len(prompts)]}
params["model"] = "flux" if i == 0 else artist_model
status_log.append(f"{header}: Режим `{'text2img' if i==0 else 'img2img'}` (модель `{params['model']}`).")
yield "\n".join(status_log)
if await _call_artist_api(session, params, frame_path, previous_frame_path):
status_log.append(f"{header}: ✅ Успешно.")
generated_files.append(frame_path)
previous_frame_path = frame_path
else:
status_log.append(f"❌ **СТОП**: {header}: Не удалось сгенерировать."); yield "\n".join(status_log)
break
yield "\n".join(status_log)
if i < total_frames - 1 and delay > 0: await asyncio.sleep(delay)
if not generated_files:
status_log.append("😭 **Результат**: Не создано ни одного кадра.")
return None, "\n".join(status_log)
status_log.append("🎞️ **Сборка**: Создание GIF и архивация...")
yield "\n".join(status_log)
final_gif_path = _compile_gif_and_archive(generated_files, fps, timestamp)
status_log.append(f"✅ **Готово**: GIF сохранена в `{final_gif_path}`.")
return final_gif_path, "\n".join(status_log)
# --- ИНТЕРФЕЙС GRADIO ---
def create_ui():
css = "h1 {text-align: center;} footer {display: none !important;}"
with gr.Blocks(css=css, title="🎬 GIF Animator Pro", theme=gr.themes.Soft()) as demo:
gr.HTML("""<div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #89f7fe 0%, #66a6ff 100%); border-radius: 10px; margin-bottom: 20px;"><h1 style="color: white; margin: 0; font-size: 2.8em; text-shadow: 2px 2px 4px #00000040;">🎬 GIF Animator Pro</h1></div>""")
with gr.Row():
with gr.Column(scale=2):
main_idea = gr.Textbox(label="💡 1. Главная идея анимации", lines=3)
with gr.Accordion("🤖 2. Выбор модели художника", open=True):
artist_model = gr.Dropdown(IMAGE_MODELS_IMG2IMG, label="Модель-художник", value="kontext")
with gr.Accordion("⚙️ 3. Параметры анимации", open=True):
fps = gr.Slider(5, 30, value=10, step=1, label="Кадров/сек")
duration = gr.Slider(1, 10, value=2, step=1, label="Длительность (сек)")
width = gr.Slider(256, 1024, value=512, step=64, label="Ширина")
height = gr.Slider(256, 1024, value=512, step=64, label="Высота")
with gr.Accordion("🔧 4. Дополнительные настройки", open=False):
seed = gr.Number(label="Seed (0 = случайный)", value=0)
delay = gr.Slider(0, 30, value=2, step=1, label="Пауза между кадрами")
proxy = gr.Textbox(label="SOCKS5 Прокси (опционально)")
generate_btn = gr.Button("✨ Сгенерировать GIF!", variant="primary", size="lg")
with gr.Column(scale=3):
output_gif = gr.Image(label="🎞️ Результат", interactive=False, height=400)
with gr.Tabs():
with gr.TabItem("📋 Журнал событий"): status_box = gr.Markdown()
with gr.TabItem("🔑 Статус API ключей"):
keys_status = gr.Markdown(value=key_rotator.get_status)
refresh_btn = gr.Button("🔄 Обновить ключи")
def on_generate_start(): return {generate_btn: gr.update(value="⏳ Генерация...", interactive=False), status_box: None, output_gif: None}
def on_generate_finish(): return {generate_btn: gr.update(value="✨ Сгенерировать!", interactive=True)}
generate_btn.click(on_generate_start, outputs=[generate_btn, status_box, output_gif]).then(
fn=generate_gif_flow,
inputs=[proxy, main_idea, fps, duration, width, height, artist_model, seed, delay],
outputs=[output_gif, status_box]
).then(on_generate_finish, outputs=[generate_btn])
refresh_btn.click(lambda: (key_rotator.refresh(), key_rotator.get_status())[1], outputs=[keys_status])
return demo
if __name__ == "__main__":
ui = create_ui()
# Для запуска на Hugging Face Spaces и локально используем launch() без аргументов.
ui.launch()