import os import time import logging import re # Import regex for video ID extraction from typing import List, Optional, Dict # Added Dict from dotenv import load_dotenv from llama_index.core.agent.workflow import ReActAgent from llama_index.core.tools import FunctionTool from llama_index.llms.google_genai import GoogleGenAI from llama_index.tools.google import GoogleSearchToolSpec from llama_index.tools.tavily_research import TavilyToolSpec from llama_index.tools.wikipedia import WikipediaToolSpec from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec from llama_index.tools.yahoo_finance import YahooFinanceToolSpec from llama_index.tools.arxiv import ArxivToolSpec # Attempt to import browser tools; handle import errors gracefully try: from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.common.exceptions import WebDriverException, NoSuchElementException, TimeoutException from helium import start_chrome, go_to, find_all, Text, kill_browser, get_driver, click, write, press SELENIUM_AVAILABLE = True except ImportError: logging.warning("Selenium or Helium not installed. Browser interaction tools will be unavailable.") SELENIUM_AVAILABLE = False # Attempt to import YouTube transcript API try: from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound YOUTUBE_TRANSCRIPT_API_AVAILABLE = True except ImportError: logging.warning("youtube-transcript-api not installed. YouTube transcript tool will be unavailable.") YOUTUBE_TRANSCRIPT_API_AVAILABLE = False # Load environment variables load_dotenv() # Setup logging logger = logging.getLogger(__name__) # --- Helper function to extract YouTube Video ID --- def extract_video_id(url: str) -> Optional[str]: """Extracts the YouTube video ID from various URL formats.""" # Standard watch URL: https://www.youtube.com/watch?v=VIDEO_ID match = re.search(r'(?:v=|/v/|embed/|youtu\.be/|/shorts/)([A-Za-z0-9_-]+)', url) if match: return match.group(1) return None # --- YouTube Transcript Tool --- def get_youtube_transcript(video_url_or_id: str, languages=None) -> str: """Fetches the transcript for a YouTube video using its URL or video ID. Specify preferred languages as a list (e.g., ["en", "es"]). Returns the transcript text or an error message. """ if languages is None: languages = ["en"] if not YOUTUBE_TRANSCRIPT_API_AVAILABLE: return "Error: youtube-transcript-api library is required but not installed." logger.info(f"Attempting to fetch YouTube transcript for: {video_url_or_id}") video_id = extract_video_id(video_url_or_id) if not video_id: # Assume it might be an ID already if extraction fails if re.match(r"^[a-zA-Z0-9_\-]+$", video_url_or_id): video_id = video_url_or_id logger.info("Input treated as video ID.") else: logger.error(f"Could not extract valid YouTube video ID from: {video_url_or_id}") return f"Error: Invalid YouTube URL or Video ID format: {video_url_or_id}" try: # Fetch available transcripts api = YouTubeTranscriptApi() transcript_list = api.list(video_id) # Try to find a transcript in the specified languages transcript = transcript_list.find_transcript(languages) # Fetch the actual transcript data (list of dicts) transcript_data = transcript.fetch() # Combine the text parts into a single string full_transcript = " ".join(snippet.text for snippet in transcript_data) full_transcript = " ".join(snippet.text for snippet in transcript_data) logger.info(f"Successfully fetched transcript for video ID {video_id} in language {transcript.language}.") return full_transcript except TranscriptsDisabled: logger.warning(f"Transcripts are disabled for video ID: {video_id}") return f"Error: Transcripts are disabled for this video (ID: {video_id})." except NoTranscriptFound as e: logger.warning(f"No transcript found for video ID {video_id} in languages {languages}. Available: {e.available_transcripts}") # Try fetching any available transcript if specific languages failed try: logger.info(f"Attempting to fetch any available transcript for {video_id}") any_transcript = transcript_list.find_generated_transcript(transcript_list.manually_created_transcripts.keys() or transcript_list.generated_transcripts.keys()) any_transcript_data = any_transcript.fetch() full_transcript = " ".join([item["text"] for item in any_transcript_data]) logger.info(f"Successfully fetched fallback transcript for video ID {video_id} in language {any_transcript.language}.") return full_transcript except Exception as fallback_e: logger.error(f"Could not find any transcript for video ID {video_id}. Original error: {e}. Fallback error: {fallback_e}") return f"Error: No transcript found for video ID {video_id} in languages {languages} or any fallback language." except Exception as e: logger.error(f"Unexpected error fetching transcript for video ID {video_id}: {e}", exc_info=True) return f"Error fetching transcript: {e}" # --- Browser Interaction Tools (Conditional on Selenium/Helium availability) --- # Global browser instance (managed by initializer) _browser_instance = None _browser_driver = None # Helper decorator for browser tool error handling and logging def browser_tool_handler(func): def wrapper(*args, **kwargs): if not SELENIUM_AVAILABLE: return "Error: Browser tools require Selenium and Helium to be installed." if _browser_instance is None or _browser_driver is None: # Attempt to initialize if not already done (e.g., if called directly) # This is not ideal, initialization should happen via get_research_initializer() logger.warning("Browser accessed before explicit initialization. Attempting to initialize now.") try: get_research_initializer() # This will initialize the browser if _browser_instance is None or _browser_driver is None: return "Error: Browser initialization failed." except Exception as init_err: return f"Error: Browser initialization failed: {init_err}" func_name = func.__name__ logger.info(f"Executing browser tool: {func_name} with args: {args}, kwargs: {kwargs}") try: result = func(*args, **kwargs) logger.info(f"Tool {func_name} executed successfully.") # Ensure result is a string for consistency return str(result) if result is not None else f"{func_name} completed." except (NoSuchElementException, WebDriverException, TimeoutException) as e: logger.warning(f"Browser error in {func_name}: {e.__class__.__name__} - {str(e).split()[0]}") return f"Error in {func_name}: {e.__class__.__name__} - {str(e).split()[0]}" except Exception as e: logger.error(f"Unexpected error in {func_name}: {e}", exc_info=True) return f"Unexpected error in {func_name}: {e}" return wrapper @browser_tool_handler def visit(url: str, wait_seconds: float = 3.0) -> str: """Navigate the browser to the specified URL and wait for the page to load.""" logger.info(f"Navigating to {url} and waiting {wait_seconds}s...") go_to(url) time.sleep(wait_seconds) # Wait for dynamic content current_url = _browser_driver.current_url return f"Successfully navigated to: {current_url}" @browser_tool_handler def get_text_by_css(selector: str) -> List[str]: """Extract text from all elements matching a CSS selector. Use selector=\"body\" for all visible text.""" logger.info(f"Extracting text using CSS selector: {selector}") if selector.lower() == "body": # Helium Text() might be too broad, let's try body tag first try: body_element = _browser_driver.find_element(By.TAG_NAME, "body") all_text = body_element.text.split("\n") # Split into lines # Filter out empty lines non_empty_text = [line.strip() for line in all_text if line.strip()] logger.info(f"Extracted {len(non_empty_text)} lines of text from body.") return non_empty_text except NoSuchElementException: logger.warning("Could not find body tag, falling back to Helium Text().") elements = find_all(Text()) # Process Helium elements if fallback is used texts = [elem.web_element.text for elem in elements if elem.web_element.is_displayed() and elem.web_element.text.strip()] logger.info(f"Extracted {len(texts)} visible text elements using Helium Text().") return texts else: # Use Selenium directly for more control elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector) texts = [elem.text for elem in elements_selenium if elem.is_displayed() and elem.text.strip()] logger.info(f"Extracted {len(texts)} visible text elements for selector {selector}.") return texts @browser_tool_handler def get_page_html() -> str: """Return the full HTML source of the current page.""" logger.info("Retrieving page HTML source...") return _browser_driver.page_source @browser_tool_handler def click_element_by_css(selector: str, index: int = 0) -> str: """Click on the Nth (0-based index) element matching the CSS selector.""" logger.info(f"Attempting to click element {index} matching selector: {selector}") # Use Selenium directly for finding elements elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector) if not elements_selenium: raise NoSuchElementException(f"No elements found for selector: {selector}") if index >= len(elements_selenium): raise IndexError(f"Index {index} out of bounds. Only {len(elements_selenium)} elements found for selector: {selector}") target_element = elements_selenium[index] if not target_element.is_displayed() or not target_element.is_enabled(): logger.warning(f"Element {index} for selector {selector} is not visible or enabled. Attempting click anyway.") # Try scrolling into view first try: _browser_driver.execute_script("arguments[0].scrollIntoView(true);", target_element) time.sleep(0.5) except Exception as scroll_err: logger.warning(f"Could not scroll element into view: {scroll_err}") # Use Helium click which might handle overlays better, passing the Selenium element click(target_element) time.sleep(1.5) # Increased wait after click return f"Clicked element {index} matching selector {selector}. Current URL: {_browser_driver.current_url}" @browser_tool_handler def input_text_by_css(selector: str, text: str, index: int = 0, press_enter: bool = False) -> str: """Input text into the Nth (0-based index) element matching the CSS selector. Optionally press Enter.""" logger.info(f"Attempting to input text into element {index} matching selector: {selector}") # Use Selenium directly for finding elements elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector) if not elements_selenium: raise NoSuchElementException(f"No elements found for selector: {selector}") if index >= len(elements_selenium): raise IndexError(f"Index {index} out of bounds. Only {len(elements_selenium)} elements found for selector: {selector}") target_element = elements_selenium[index] if not target_element.is_displayed() or not target_element.is_enabled(): logger.warning(f"Input element {index} for selector {selector} is not visible or enabled. Attempting input anyway.") # Try scrolling into view try: _browser_driver.execute_script("arguments[0].scrollIntoView(true);", target_element) time.sleep(0.5) except Exception as scroll_err: logger.warning(f"Could not scroll input element into view: {scroll_err}") # Use Helium write, passing the Selenium element write(text, into=target_element) time.sleep(0.5) if press_enter: press(Keys.ENTER) time.sleep(1.5) # Wait longer if Enter was pressed return f"Input text into element {index} ({selector}) and pressed Enter. Current URL: {_browser_driver.current_url}" else: return f"Input text into element {index} ({selector})." @browser_tool_handler def scroll_page(direction: str = "down", amount: str = "page") -> str: """Scroll the page up or down by a specified amount ('page', 'top', 'bottom', or pixels).""" logger.info(f"Scrolling {direction} by {amount}") if direction not in ["up", "down"]: raise ValueError("Direction must be \"up\" or \"down\".") if amount == "page": scroll_script = "window.scrollBy(0, window.innerHeight);" if direction == "down" else "window.scrollBy(0, -window.innerHeight);" elif amount == "top": scroll_script = "window.scrollTo(0, 0);" elif amount == "bottom": scroll_script = "window.scrollTo(0, document.body.scrollHeight);" else: try: pixels = int(amount) scroll_script = f"window.scrollBy(0, {pixels});" if direction == "down" else f"window.scrollBy(0, {-pixels});" except ValueError: raise ValueError("Amount must be \"page\", \"top\", \"bottom\", or a number of pixels.") _browser_driver.execute_script(scroll_script) time.sleep(1) # Wait for scroll effects return f"Scrolled {direction} by {amount}." @browser_tool_handler def go_back() -> str: """Navigate the browser back one step in its history.""" logger.info("Navigating back...") _browser_driver.back() time.sleep(1.5) # Wait after navigation return f"Navigated back. Current URL: {_browser_driver.current_url}" @browser_tool_handler def close_popups() -> str: """Send an ESC keypress to attempt to dismiss modals or pop-ups.""" logger.info("Sending ESC key...") webdriver.ActionChains(_browser_driver).send_keys(Keys.ESCAPE).perform() time.sleep(0.5) return "Sent ESC key press." # --- Search Engine & Data Source Tools --- # --- Agent Initializer Class --- class ResearchAgentInitializer: def __init__(self): logger.info("Initializing ResearchAgent resources...") self.llm = None self.browser_tools = [] self.search_tools = [] self.datasource_tools = [] self.youtube_tool = None # Added for YouTube tool # Initialize LLM self._initialize_llm() # Initialize Browser (conditionally) if SELENIUM_AVAILABLE: self._initialize_browser() self._create_browser_tools() else: logger.warning("Browser tools are disabled as Selenium/Helium are not available.") # Initialize Search/Datasource Tools self._create_search_tools() self._create_datasource_tools() self._create_youtube_tool() # Added logger.info("ResearchAgent resources initialized.") def _initialize_llm(self): agent_llm_model = os.getenv("RESEARCH_AGENT_LLM_MODEL", "models/gemini-1.5-pro") gemini_api_key = os.getenv("GEMINI_API_KEY") if not gemini_api_key: logger.error("GEMINI_API_KEY not found for ResearchAgent LLM.") raise ValueError("GEMINI_API_KEY must be set for ResearchAgent") try: self.llm = GoogleGenAI(api_key=gemini_api_key, model=agent_llm_model) logger.info(f"ResearchAgent LLM initialized: {agent_llm_model}") except Exception as e: logger.error(f"Failed to initialize ResearchAgent LLM: {e}", exc_info=True) raise def _initialize_browser(self): global _browser_instance, _browser_driver if _browser_instance is None: logger.info("Initializing browser (Chrome headless)...") try: chrome_options = webdriver.ChromeOptions() # Configurable options from env vars if os.getenv("RESEARCH_AGENT_CHROME_NO_SANDBOX", "true").lower() == "true": chrome_options.add_argument("--no-sandbox") if os.getenv("RESEARCH_AGENT_CHROME_DISABLE_DEV_SHM", "true").lower() == "true": chrome_options.add_argument("--disable-dev-shm-usage") # Add prefs for downloads/popups chrome_options.add_experimental_option("prefs", { "download.prompt_for_download": False, "plugins.always_open_pdf_externally": True, "profile.default_content_settings.popups": 0 }) # Start Chrome using Helium _browser_instance = start_chrome(headless=True, options=chrome_options) _browser_driver = get_driver() # Get the underlying Selenium driver logger.info("Browser initialized successfully.") except Exception as e: logger.error(f"Failed to initialize browser: {e}", exc_info=True) # Set flags to prevent tool usage global SELENIUM_AVAILABLE SELENIUM_AVAILABLE = False _browser_instance = None _browser_driver = None def _create_browser_tools(self): if not SELENIUM_AVAILABLE: self.browser_tools = [] return self.browser_tools = [ FunctionTool.from_defaults(fn=visit, name="visit_url"), # Renamed for clarity FunctionTool.from_defaults(fn=get_text_by_css, name="get_text_by_css"), FunctionTool.from_defaults(fn=get_page_html, name="get_page_html"), FunctionTool.from_defaults(fn=click_element_by_css, name="click_element_by_css"), FunctionTool.from_defaults(fn=input_text_by_css, name="input_text_by_css"), FunctionTool.from_defaults(fn=scroll_page, name="scroll_page"), FunctionTool.from_defaults(fn=go_back, name="navigate_back"), # Renamed FunctionTool.from_defaults(fn=close_popups, name="close_popups"), ] for tool in self.browser_tools: tool.metadata.description = f"(Browser) {tool.metadata.description}" logger.info(f"Created {len(self.browser_tools)} browser interaction tools.") def _create_search_tools(self): self.search_tools = [] # Google Search google_spec = GoogleSearchToolSpec(key=os.getenv("GOOGLE_API_KEY"), engine=os.getenv("GOOGLE_CSE_ID")) if google_spec: google_tool = FunctionTool.from_defaults(fn=google_spec.google_search, name="google_search") google_tool.metadata.description = "(Search) Execute a Google Custom Search query. Returns structured results." self.search_tools.append(google_tool) # Tavily Search tavily_spec = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY")) if tavily_spec: # Use search method which is more general tavily_tool = FunctionTool.from_defaults(fn=tavily_spec.search, name="tavily_search") tavily_tool.metadata.description = "(Search) Perform a deep research search using Tavily API. Good for finding documents/articles." self.search_tools.append(tavily_tool) # DuckDuckGo Search ddg_spec = DuckDuckGoSearchToolSpec() if ddg_spec: ddg_tool = FunctionTool.from_defaults(fn=ddg_spec.duckduckgo_full_search, name="duckduckgo_search") ddg_tool.metadata.description = "(Search) Execute a DuckDuckGo search. Returns structured results." self.search_tools.append(ddg_tool) logger.info(f"Created {len(self.search_tools)} search engine tools.") def _create_datasource_tools(self): self.datasource_tools = [] # Wikipedia wiki_spec = WikipediaToolSpec() if wiki_spec: wiki_search_tool = FunctionTool.from_defaults(fn=wiki_spec.search_data, name="wikipedia_search_pages") wiki_search_tool.metadata.description = "(Wikipedia) Search for Wikipedia page titles matching a query." wiki_load_tool = FunctionTool.from_defaults(fn=wiki_spec.load_data, name="wikipedia_load_page") wiki_load_tool.metadata.description = "(Wikipedia) Load the full content of a specific Wikipedia page title." self.datasource_tools.extend([wiki_search_tool, wiki_load_tool]) # Yahoo Finance yf_spec = YahooFinanceToolSpec() if yf_spec: yf_tools_map = { "balance_sheet": "Get the latest balance sheet for a stock ticker.", "income_statement": "Get the latest income statement for a stock ticker.", "cash_flow": "Get the latest cash flow statement for a stock ticker.", "stock_basic_info": "Get basic info (price, market cap, summary) for a stock ticker.", "stock_analyst_recommendations": "Get analyst recommendations for a stock ticker.", "stock_news": "Get recent news headlines for a stock ticker." } for func_name, desc in yf_tools_map.items(): if hasattr(yf_spec, func_name): tool = FunctionTool.from_defaults(fn=getattr(yf_spec, func_name), name=f"yahoo_finance_{func_name}") tool.metadata.description = f"(YahooFinance) {desc}" self.datasource_tools.append(tool) else: logger.warning(f"YahooFinance function {func_name} not found in spec.") # ArXiv arxiv_spec = ArxivToolSpec() if arxiv_spec: arxiv_tool = FunctionTool.from_defaults(fn=arxiv_spec.arxiv_query, name="arxiv_search") arxiv_tool.metadata.description = "(ArXiv) Search ArXiv for academic papers matching a query." self.datasource_tools.append(arxiv_tool) logger.info(f"Created {len(self.datasource_tools)} specific data source tools.") def _create_youtube_tool(self): # Added method if YOUTUBE_TRANSCRIPT_API_AVAILABLE: self.youtube_tool = FunctionTool.from_defaults( fn=get_youtube_transcript, name="get_youtube_transcript", description=( "(YouTube) Fetches the transcript text for a given YouTube video URL or video ID. " "Specify preferred languages (e.g., [\"en\", \"es\"]). Returns transcript or error." ) ) logger.info("Created YouTube transcript tool.") else: self.youtube_tool = None logger.warning("YouTube transcript tool disabled because youtube-transcript-api is not installed.") def get_agent(self) -> ReActAgent: """Creates and returns the configured ReActAgent for research.""" logger.info("Creating ResearchAgent ReActAgent instance...") all_tools = self.browser_tools + self.search_tools + self.datasource_tools if self.youtube_tool: # Add YouTube tool if available all_tools.append(self.youtube_tool) if not all_tools: logger.warning("No tools available for ResearchAgent. It will likely be unable to function.") # System prompt (consider loading from file) # Updated prompt to include YouTube tool system_prompt = """\ You are ResearchAgent, an autonomous web research assistant. Your goal is to gather information accurately and efficiently using the available tools. Available Tool Categories: - (Browser): Tools for direct web page interaction (visiting URLs, clicking, scrolling, extracting text/HTML, inputting text). - (Search): Tools for querying search engines (Google, DuckDuckGo, Tavily). - (Wikipedia): Tools for searching and loading Wikipedia pages. - (YahooFinance): Tools for retrieving financial data (balance sheets, income statements, stock info, news). - (ArXiv): Tool for searching academic papers on ArXiv. - (YouTube): Tool for fetching video transcripts (`get_youtube_transcript`). Workflow: 1. **Thought**: Analyze the research goal. Break it down if necessary. Choose the *single best tool* for the *next immediate step*. Explain your choice. Consider the information needed and which tool provides it most directly (e.g., use YahooFinance for stock prices, Google/DDG for general web search, Tavily for document search, ArXiv for papers, Wikipedia for encyclopedic info, YouTube for video transcripts, Browser tools for specific website interaction). 2. **Action**: Call the chosen tool with the correct arguments. Ensure inputs match the tool's requirements (e.g., URL or video ID for YouTube). 3. **Observation**: Examine the tool's output. Extract the relevant information. Check for errors. 4. **Reflect & Iterate**: Does the observation satisfy the immediate goal? Do you have enough information for the overall research task? If not, return to step 1 (Thought) to plan the *next* single step. If a tool failed, consider why and try an alternative tool or approach. 5. **Synthesize**: Once all necessary information is gathered, synthesize the findings into a coherent answer to the original research goal. 6. **Hand-Off**: Pass the synthesized findings to the appropriate next agent: **code_agent** (for coding), **math_agent** (for math), **text_analyzer_agent** (for text analysis), **planner_agent** (for planning/synthesis), or **reasoning_agent** (for logic/reasoning). Constraints: - Use only one tool per Action step. - Think step-by-step. - If using browser tools, start with `visit_url`. - Be mindful of potential errors and try alternative tools if one fails. - Synthesize results *before* handing off. """ agent = ReActAgent( name="research_agent", description=( "Performs web research using browser interaction, search engines (Google, DDG, Tavily), " "specific data sources (Wikipedia, YahooFinance, ArXiv), and YouTube transcript fetching. Follows Thought-Action-Observation loop." ), tools=all_tools, llm=self.llm, system_prompt=system_prompt, can_handoff_to=[ "code_agent", "math_agent", "text_analyzer_agent", # Added based on original prompt "planner_agent", "reasoning_agent" ], ) logger.info("ResearchAgent ReActAgent instance created.") return agent def close_browser(self): """Closes the browser instance if it was initialized.""" global _browser_instance, _browser_driver if _browser_instance: logger.info("Closing browser instance...") try: kill_browser() # Use Helium's function logger.info("Browser closed successfully.") except Exception as e: logger.error(f"Error closing browser: {e}", exc_info=True) finally: _browser_instance = None _browser_driver = None else: logger.info("No active browser instance to close.") # --- Singleton Initializer Instance --- _research_agent_initializer_instance = None def get_research_initializer(): """Gets the singleton instance of ResearchAgentInitializer.""" global _research_agent_initializer_instance if _research_agent_initializer_instance is None: logger.info("Instantiating ResearchAgentInitializer for the first time.") _research_agent_initializer_instance = ResearchAgentInitializer() return _research_agent_initializer_instance # --- Public Initialization Function --- def initialize_research_agent() -> ReActAgent: """Initializes and returns the Research Agent using a singleton initializer.""" logger.info("initialize_research_agent called.") initializer = get_research_initializer() return initializer.get_agent() # --- Cleanup Function (Optional but recommended) --- def cleanup_research_agent_resources(): """Cleans up resources used by the research agent, like the browser.""" logger.info("Cleaning up research agent resources...") initializer = get_research_initializer() # Ensure it exists initializer.close_browser() # Example usage (for testing if run directly) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger.info("Running research_agent.py directly for testing...") # Check required keys required_keys = ["GEMINI_API_KEY"] # Others are optional depending on tools needed missing_keys = [key for key in required_keys if not os.getenv(key)] if missing_keys: print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.") else: # Warn about optional keys optional_keys = ["GOOGLE_API_KEY", "GOOGLE_CSE_ID", "TAVILY_API_KEY", "WOLFRAM_ALPHA_APP_ID"] missing_optional = [key for key in optional_keys if not os.getenv(key)] if missing_optional: print(f"Warning: Optional environment variable(s) not set: {', '.join(missing_optional)}. Some tools may be unavailable.") test_agent = None try: # Test YouTube transcript tool directly if YOUTUBE_TRANSCRIPT_API_AVAILABLE: print("\nTesting YouTube transcript tool...") # Example video: "Attention is All You Need" paper explanation yt_url = "https://www.youtube.com/watch?v=TQQlZhbC5ps" transcript = get_youtube_transcript(yt_url) if not transcript.startswith("Error:"): print(f"Transcript fetched (first 500 chars):\n{transcript[:500]}...") else: print(f"YouTube Transcript Fetch Failed: {transcript}") else: print("\nSkipping YouTube transcript test as youtube-transcript-api is not available.") # Initialize agent AFTER testing standalone functions test_agent = initialize_research_agent() print("\nResearch Agent initialized successfully for testing.") # Example test (requires browser tools to be available) # if SELENIUM_AVAILABLE: # print("\nTesting browser visit...") # result = test_agent.chat("Visit https://example.com and tell me the main heading text using CSS selector 'h1'") # print(f"Test query result: {result}") # else: # print("\nSkipping browser test as Selenium/Helium are not available.") # Example search test (requires GOOGLE keys) # if os.getenv("GOOGLE_API_KEY") and os.getenv("GOOGLE_CSE_ID"): # print("\nTesting Google Search...") # result_search = test_agent.chat("Search for 'LlamaIndex Agent Workflow'") # print(f"Search test result: {result_search}") # else: # print("\nSkipping Google Search test as API keys are not set.") except Exception as e: print(f"Error during testing: {e}") finally: # Clean up browser if it was started if test_agent: print("\nCleaning up resources...") cleanup_research_agent_resources()