Spaces:
Running
Running
| # app.py | |
| import gradio as gr | |
| import asyncio | |
| import os | |
| import random | |
| import traceback | |
| import logging | |
| import threading | |
| from poke_env.player import Player, RandomPlayer | |
| from poke_env import AccountConfiguration, ServerConfiguration | |
| from agents import OpenAIAgent | |
| # --- Configuration --- | |
| POKE_SERVER_URL = "wss://jofthomas.com/showdown/websocket" | |
| POKE_AUTH_URL = "https://jofthomas.com/showdown/action.php" | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s') | |
| # --- Constants --- | |
| RANDOM_PLAYER_BASE_NAME = "RandAgent" | |
| OPENAI_AGENT_BASE_NAME = "OpenAIAgent" | |
| DEFAULT_BATTLE_FORMAT = "gen9randombattle" | |
| custom_config = ServerConfiguration(POKE_SERVER_URL, POKE_AUTH_URL) | |
| # --- Agent Creation (Async - Required by poke-env) --- | |
| async def create_agent_async(agent_type: str, battle_format: str = DEFAULT_BATTLE_FORMAT) -> Player | str: | |
| """ | |
| Creates and initializes a SINGLE agent instance with a unique username. | |
| This function MUST be async because Player initialization involves async network setup. | |
| Returns the Player object on success, or an error string on failure. | |
| """ | |
| logging.info(f"Attempting to create agent of type: {agent_type}") | |
| player: Player | None = None | |
| error_message: str | None = None | |
| username: str = "unknown_agent" | |
| agent_suffix = random.randint(10000, 999999) | |
| try: | |
| if agent_type == "Random Player": | |
| username = f"{RANDOM_PLAYER_BASE_NAME}{agent_suffix}" | |
| account_config = AccountConfiguration(username, None) | |
| logging.info(f"Initializing RandomPlayer with username: {username}") | |
| player = RandomPlayer( | |
| account_configuration=account_config, | |
| server_configuration=custom_config, | |
| battle_format=battle_format, | |
| start_listening=True, | |
| ) | |
| elif agent_type == "OpenAI Agent": | |
| if not os.getenv("OPENAI_API_KEY"): | |
| error_message = "Error: Cannot create OpenAI Agent. OPENAI_API_KEY environment variable is missing." | |
| logging.error(error_message) | |
| return error_message | |
| username = f"{OPENAI_AGENT_BASE_NAME}{agent_suffix}" | |
| account_config = AccountConfiguration(username, None) | |
| logging.info(f"Initializing OpenAIAgent with username: {username}") | |
| player = OpenAIAgent( | |
| account_configuration=account_config, | |
| server_configuration=custom_config, | |
| battle_format=battle_format, | |
| start_listening=True, | |
| ) | |
| else: | |
| error_message = f"Error: Invalid agent type '{agent_type}' requested." | |
| logging.error(error_message) | |
| return error_message | |
| logging.info(f"Agent object ({username}) created successfully.") | |
| return player | |
| except Exception as e: | |
| error_message = f"Error creating agent {username}: {e}" | |
| logging.error(error_message) | |
| logging.error(traceback.format_exc()) | |
| return error_message | |
| # --- Battle Invitation (Async - Required by poke-env) --- | |
| async def send_battle_invite_async(player: Player, opponent_username: str, battle_format: str) -> str: | |
| """ | |
| Sends a challenge using the provided player object. | |
| This function MUST be async as sending challenges involves network I/O. | |
| Returns a status string (success or error message). | |
| """ | |
| if not isinstance(player, Player): | |
| err_msg = f"Internal Error: Invalid object passed instead of Player: {type(player)}" | |
| logging.error(err_msg) | |
| # In background thread, we might just log this and exit thread | |
| raise TypeError(err_msg) # Raise exception to be caught by the thread runner | |
| player_username = getattr(player, 'username', 'unknown_agent') | |
| try: | |
| logging.info(f"Attempting to send challenge from {player_username} to {opponent_username} in format {battle_format}") | |
| await player.send_challenges(opponent_username, n_challenges=1) | |
| success_msg = f"Battle invitation ({battle_format}) sent to '{opponent_username}' from bot '{player_username}'." | |
| logging.info(success_msg) | |
| return success_msg # Indicate success | |
| except Exception as e: | |
| error_msg = f"Error sending challenge from {player_username} to {opponent_username}: {e}" | |
| logging.error(error_msg) | |
| logging.error(traceback.format_exc()) | |
| # Re-raise or return error indication for the thread runner | |
| raise e # Raise exception to be caught by the thread runner | |
| # --- Background Task Runner (Runs in a separate thread) --- | |
| def run_invite_in_background(agent_choice: str, target_username: str, battle_format: str): | |
| """ | |
| This function runs in a separate thread for each invite request. | |
| It sets up and runs the asyncio operations needed for one invite. | |
| """ | |
| thread_name = threading.current_thread().name | |
| logging.info(f"Background thread '{thread_name}' started for {agent_choice} vs {target_username}.") | |
| async def _run_async_challenge_steps(): | |
| """The async steps to be run via asyncio.run() in this thread.""" | |
| agent_or_error = await create_agent_async(agent_choice, battle_format) | |
| if isinstance(agent_or_error, str): | |
| # Agent creation failed, log the error message from create_agent_async | |
| logging.error(f"[{thread_name}] Agent creation failed: {agent_or_error}") | |
| # No further action needed in this thread | |
| return | |
| player_instance: Player = agent_or_error | |
| player_username = getattr(player_instance, 'username', 'agent') | |
| logging.info(f"[{thread_name}] Agent {player_username} created, proceeding to challenge {target_username}.") | |
| try: | |
| result = await send_battle_invite_async(player_instance, target_username, battle_format) | |
| # Log the success message from send_battle_invite_async | |
| logging.info(f"[{thread_name}] Challenge result: {result}") | |
| except Exception as invite_error: | |
| # Log errors from send_battle_invite_async | |
| # Error message/traceback already logged inside send_battle_invite_async | |
| logging.error(f"[{thread_name}] Failed to send challenge from {player_username} to {target_username}. Error: {invite_error}") | |
| finally: | |
| pass | |
| try: | |
| asyncio.run(_run_async_challenge_steps()) | |
| logging.info(f"Background thread '{thread_name}' finished successfully for {target_username}.") | |
| except RuntimeError as e: | |
| logging.error(f"[{thread_name}] asyncio RuntimeError: {e}") | |
| logging.error(traceback.format_exc()) | |
| except Exception as e: | |
| logging.error(f"[{thread_name}] Unexpected error in background task: {e}") | |
| logging.error(traceback.format_exc()) | |
| # --- Gradio Interface Logic (Starts the background thread) --- | |
| def start_invite_thread(agent_choice: str, username: str) -> str: | |
| """ | |
| Handles the Gradio button click (Synchronous, but FAST). | |
| Performs basic validation and starts a background thread to handle | |
| the actual agent creation and invitation process. | |
| Returns an immediate status message to Gradio. | |
| """ | |
| username_clean = username.strip() | |
| if not username_clean: | |
| return "⚠️ Please enter your Showdown username." | |
| if not agent_choice: | |
| return "⚠️ Please select an agent type." | |
| logging.info(f"Received request: Agent={agent_choice}, Opponent={username_clean}. Starting background thread.") | |
| # Create and start the background thread | |
| thread = threading.Thread( | |
| target=run_invite_in_background, | |
| args=(agent_choice, username_clean, DEFAULT_BATTLE_FORMAT), | |
| daemon=True # Set as daemon so threads don't block app exit | |
| ) | |
| thread.start() | |
| # Return immediately to Gradio UI | |
| return f"✅ Invite process for '{username_clean}' started in background. Check Pokémon Showdown and logs for status." | |
| # --- Gradio UI Definition --- | |
| # [ main_app function remains the same, but the button click now calls start_invite_thread ] | |
| def main_app(): | |
| """Creates and returns the Gradio application interface.""" | |
| agent_options = ["Random Player"] | |
| agent_options.append("OpenAI Agent") | |
| # Use a more descriptive title if possible | |
| with gr.Blocks(title="Pokemon Showdown Multi-Challenger") as demo: | |
| gr.Markdown("# Pokémon Battle Agent Challenger") | |
| gr.Markdown( | |
| "1. Choose a name in the Iframe, if you have an account, you can also connect.\n" | |
| "2. Select an agent type.\n" | |
| "3. Enter **your** Showdown username (the one you are logged in with below).\n" | |
| "4. Click 'Send Battle Invitation'. You can click multiple times for different users.\n\n" | |
| "A temporary bot will be created *in the background* to send the challenge in `gen9randombattle` format." | |
| ) | |
| with gr.Row(): | |
| agent_dropdown = gr.Dropdown( | |
| label="Select Agent", choices=agent_options, value=agent_options[0], scale=1 | |
| ) | |
| name_input = gr.Textbox( | |
| label="Your Pokémon Showdown Username", placeholder="Enter username used in Showdown below", scale=2 | |
| ) | |
| #variant="primary" | |
| battle_button = gr.Button("Send Battle Invitation", scale=1) | |
| gr.HTML(""" | |
| <iframe | |
| src="https://jofthomas.com/play.pokemonshowdown.com/testclient.html" | |
| width="100%" height="800" style="border: none;" referrerpolicy="no-referrer"> | |
| </iframe> | |
| """) | |
| # *** IMPORTANT: Update the click handler *** | |
| battle_button.click( | |
| fn=start_invite_thread, # Calls the function that starts the thread | |
| inputs=[agent_dropdown, name_input], | |
| ) | |
| return demo | |
| # --- Application Entry Point --- | |
| # [ if __name__ == "__main__": block remains the same ] | |
| if __name__ == "__main__": | |
| app = main_app() | |
| app.launch() |