import os import logging from typing import List, Dict from dotenv import load_dotenv from llama_index.core.agent.workflow import ReActAgent from llama_index.core.tools import FunctionTool from llama_index.llms.google_genai import GoogleGenAI # Load environment variables load_dotenv() # Setup logging logger = logging.getLogger(__name__) # Helper function to load prompt from file def load_prompt_from_file(filename: str, default_prompt: str) -> str: """Loads a prompt from a text file.""" try: # Assuming the prompt file is in the same directory as the agent script script_dir = os.path.dirname(__file__) prompt_path = os.path.join(script_dir, filename) with open(prompt_path, "r") as f: prompt = f.read() logger.info(f"Successfully loaded prompt from {prompt_path}") return prompt except FileNotFoundError: logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.") return default_prompt except Exception as e: logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True) return default_prompt # --- Tool Functions --- def plan(objective: str) -> List[str]: """ Generate a list of sub-steps (4-8) from the given objective using an LLM. Args: objective (str): The research or task objective. Returns: List[str]: A list of sub-steps as strings, or an error message list. """ logger.info(f"Generating plan for objective: {objective[:100]}...") # Configuration for planning LLM planner_llm_model = os.getenv("PLANNER_TOOL_LLM_MODEL", "models/gemini-1.5-pro") # Specific model for this tool? gemini_api_key = os.getenv("GEMINI_API_KEY") if not gemini_api_key: logger.error("GEMINI_API_KEY not found for planning tool LLM.") return ["Error: GEMINI_API_KEY not set for planning."] # Prompt for the LLM to generate sub-steps input_prompt = ( "You are a research assistant. " "Given an objective, break it down into a list of 4-8 concise, actionable sub-steps. " "Ensure the steps are logically ordered.\n" f"Objective: {objective}\n" "Sub-steps (one per line, numbered):" ) try: llm = GoogleGenAI(api_key=gemini_api_key, model=planner_llm_model) logger.info(f"Using planning LLM: {planner_llm_model}") response = llm.complete(input_prompt) # Post-process: split lines into sub-steps, remove numbering if present lines = response.text.strip().split("\n") sub_steps = [] for line in lines: line = line.strip() if not line: continue # Remove potential leading numbering (e.g., "1. ", "- ") if line and line[0].isdigit() and "." in line[:3]: text = line.split(".", 1)[1].strip() elif line.startswith("- "): text = line[2:].strip() else: text = line if text: sub_steps.append(text) if not sub_steps: logger.warning("LLM generated no sub-steps for the objective.") return ["Error: Failed to generate sub-steps."] logger.info(f"Generated {len(sub_steps)} sub-steps.") return sub_steps except Exception as e: logger.error(f"LLM call failed during planning: {e}", exc_info=True) return [f"Error during planning: {e}"] def synthesize_and_respond(results: List[Dict[str, str]]) -> str: """ Aggregate results from sub-steps into a coherent final report using an LLM. Args: results (List[Dict[str, str]]): List of dictionaries, each with "sub_step" and "answer" keys. Returns: str: A unified, well-structured response, or an error message. """ logger.info(f"Synthesizing results from {len(results)} sub-steps...") if not results: logger.warning("Synthesize called with empty results list.") return "No results provided to synthesize." # Format the results for the synthesis prompt summary_blocks = "" for i, result in enumerate(results): sub_step = result.get("sub_step", f"Step {i+1}") answer = result.get("answer", "No answer provided.") summary_blocks += f"Sub-step {i+1}: {sub_step}\nAnswer {i+1}: {answer}\n\n" # Configuration for synthesis LLM synthesizer_llm_model = os.getenv("SYNTHESIZER_LLM_MODEL", "models/gemini-1.5-pro") # Specific model? gemini_api_key = os.getenv("GEMINI_API_KEY") if not gemini_api_key: logger.error("GEMINI_API_KEY not found for synthesis tool LLM.") return "Error: GEMINI_API_KEY not set for synthesis." # Prompt for the LLM input_prompt = f"""You are an expert synthesizer. Given the following sub-steps and their answers derived from an initial objective, produce a single, coherent, comprehensive final report that addresses the original objective: --- SUB-STEP RESULTS --- {summary_blocks.strip()} --- END SUB-STEP RESULTS --- Generate the Final Report: """ try: llm = GoogleGenAI(api_key=gemini_api_key, model=synthesizer_llm_model) logger.info(f"Using synthesis LLM: {synthesizer_llm_model}") response = llm.complete(input_prompt) logger.info("Synthesis successful.") return response.text except Exception as e: logger.error(f"LLM call failed during synthesis: {e}", exc_info=True) return f"Error during synthesis: {e}" # --- Tool Definitions --- synthesize_tool = FunctionTool.from_defaults( fn=synthesize_and_respond, name="synthesize_and_respond", description=( "Aggregates results from multiple sub-steps into a final coherent report. " "Input: results (List[Dict[str, str]]) where each dict has \"sub_step\" and \"answer\". " "Output: A unified report (str) or error message." ), ) generate_substeps_tool = FunctionTool.from_defaults( fn=plan, name="generate_substeps", description=( "Decomposes a high-level objective into a concise roadmap of 4–8 actionable sub-steps using an LLM. " "Input: objective (str). Output: List of sub-step strings (List[str]) or error list." ) ) # --- Agent Initialization --- def initialize_planner_agent() -> ReActAgent: """Initializes the Planner Agent.""" logger.info("Initializing PlannerAgent...") # Configuration for the agent's main LLM agent_llm_model = os.getenv("PLANNER_AGENT_LLM_MODEL", "models/gemini-1.5-pro") gemini_api_key = os.getenv("GEMINI_API_KEY") if not gemini_api_key: logger.error("GEMINI_API_KEY not found for PlannerAgent.") raise ValueError("GEMINI_API_KEY must be set for PlannerAgent") try: llm = GoogleGenAI(api_key=gemini_api_key, model=agent_llm_model) logger.info(f"Using agent LLM: {agent_llm_model}") # Load system prompt default_system_prompt = ("You are PlannerAgent... [Default prompt content - replace with actual]" # Placeholder ) system_prompt = load_prompt_from_file("../prompts/planner_agent_prompt.txt", default_system_prompt) if system_prompt == default_system_prompt: logger.warning("Using default/fallback system prompt for PlannerAgent.") # Define available tools tools = [generate_substeps_tool, synthesize_tool] # Define valid handoff targets valid_handoffs = [ "code_agent", "research_agent", "math_agent", "role_agent", "image_analyzer_agent", "text_analyzer_agent", "verifier_agent", "reasoning_agent" ] agent = ReActAgent( name="planner_agent", description=( "Strategically plans tasks by breaking down objectives into sub-steps using `generate_substeps`. " "Orchestrates execution by handing off sub-steps to specialized agents. " "Synthesizes final results using `synthesize_and_respond`." ), tools=tools, llm=llm, system_prompt=system_prompt, can_handoff_to=valid_handoffs, ) logger.info("PlannerAgent initialized successfully.") return agent except Exception as e: logger.error(f"Error during PlannerAgent initialization: {e}", exc_info=True) raise # Example usage (for testing if run directly) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger.info("Running planner_agent.py directly for testing...") # Ensure API key is set if not os.getenv("GEMINI_API_KEY"): print("Error: GEMINI_API_KEY environment variable not set. Cannot run test.") else: try: # Test plan generation print("\nTesting plan generation...") test_objective = "Analyze the market trends for electric vehicles in Europe for 2024." substeps = plan(test_objective) print(f"Generated Sub-steps:\n{substeps}") # Test synthesis print("\nTesting synthesis...") test_results = [ {"sub_step": "Identify key EV manufacturers in Europe.", "answer": "Tesla, VW, Stellantis, Renault."}, {"sub_step": "Find recent sales data.", "answer": "EV sales grew 25% year-over-year in Q1 2024."}, {"sub_step": "Analyze government incentives.", "answer": "Germany reduced subsidies, France maintained them."} ] report = synthesize_and_respond(test_results) print(f"Synthesized Report:\n{report}") # Initialize the agent (optional) # test_agent = initialize_planner_agent() # print("\nPlanner Agent initialized successfully for testing.") except Exception as e: print(f"Error during testing: {e}")