import os
import time
import logging
import re
from functools import lru_cache, wraps
from typing import Optional, Dict
from requests.exceptions import RequestException
import wikipedia
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from llama_index.readers.web import BeautifulSoupWebReader
from smolagents import (
CodeAgent,
InferenceClientModel,
GoogleSearchTool,
tool,
Tool,
)
# --- Configuration and Setup ---
def configure_logging():
"""Sets up detailed logging configuration."""
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
def get_api_keys_from_env() -> Dict[str, Optional[str]]:
"""Retrieves API keys directly from environment variables."""
keys = {
'together': os.getenv('TOGETHER_API_KEY'),
'serpapi': os.getenv('SERPAPI_API_KEY')
}
if not keys['together']:
raise ValueError("TOGETHER_API_KEY is required but not found in environment variables.")
return keys
# --- Custom Exceptions ---
class SerpApiClientException(Exception): pass
class YouTubeTranscriptApiError(Exception): pass
# --- Decorators ---
def retry(max_retries=3, initial_delay=1, backoff=2):
"""A robust retry decorator with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
return func(*args, **kwargs)
except (RequestException, SerpApiClientException, YouTubeTranscriptApiError, TranscriptsDisabled, NoTranscriptFound) as e:
if attempt == max_retries:
logging.error(f"{func.__name__} failed after {attempt} attempts: {e}")
return f"Tool Error: {func.__name__} failed after {max_retries} attempts. Details: {e}"
time.sleep(initial_delay * (backoff ** (attempt - 1)))
except Exception as e:
logging.error(f"{func.__name__} failed with a non-retryable error: {e}")
return f"Tool Error: A non-retryable error occurred in {func.__name__}: {e}"
return wrapper
return decorator
# --- Answer Formatting and Extraction ---
def extract_final_answer(response: str) -> str:
"""Extracts the final answer from the agent's full response string."""
if not response: return ""
match = re.search(r'FINAL\s+ANSWER\s*:\s*(.*)', response, re.IGNORECASE | re.DOTALL)
if match: return match.group(1).strip()
lines = response.strip().split('\n')
return lines[-1].strip() if lines else ""
def normalize_answer_format(answer: str) -> str:
"""Normalizes the extracted answer to meet strict GAIA formatting requirements."""
if not answer: return ""
answer = answer.strip().rstrip('.')
is_list = ',' in answer and len(answer.split(',')) > 1
try:
is_numeric = not is_list and float(answer.replace(',', '')) is not None
except ValueError:
is_numeric = False
if is_numeric: return re.sub(r'[,$%]', '', answer).strip()
if is_list:
elements = [normalize_answer_format(elem.strip()) for elem in answer.split(',')]
return ', '.join(elements)
return answer
# --- Agent Wrapper for GAIA Compliance ---
def create_gaia_agent_wrapper(agent: CodeAgent):
"""Creates a callable wrapper around the agent to enforce GAIA answer formatting."""
def gaia_compliant_agent(question: str) -> str:
logging.info(f"Received question for GAIA compliant agent: '{question}'")
full_response = agent.run(question)
logging.info(f"Agent raw response:\n---\n{full_response}\n---")
final_answer = extract_final_answer(full_response)
normalized_answer = normalize_answer_format(final_answer)
logging.info(f"Normalized answer for submission: '{normalized_answer}'")
return normalized_answer
return gaia_compliant_agent
# --- Tool Implementations (with robustness decorators) ---
@retry
@lru_cache(maxsize=128)
def _get_webpage_content_implementation(url: str) -> str:
logging.info(f"๐ Reading webpage content from: {url}")
loader = BeautifulSoupWebReader()
docs = loader.load_data(urls=[url])
if not docs or not docs[0].text:
raise ValueError(f"No content could be extracted from {url}")
return docs[0].text[:15000]
@retry
@lru_cache(maxsize=128)
def _get_youtube_transcript_implementation(video_url: str) -> str:
logging.info(f"๐ฌ Fetching YouTube transcript for: {video_url}")
video_id_match = re.search(r'(?:v=|\/)([a-zA-Z0-9_-]{11}).*', video_url)
if not video_id_match:
return "Error: Invalid YouTube URL provided."
video_id = video_id_match.group(1)
try:
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = ' '.join([t['text'] for t in transcript_list])
return transcript_text[:15000]
except (TranscriptsDisabled, NoTranscriptFound) as e:
logging.error(f"Could not retrieve transcript for {video_url}: {e}")
raise YouTubeTranscriptApiError(f"Transcript not available for video {video_id}.") from e
@retry
@lru_cache(maxsize=32)
def _wikipedia_search_implementation(query: str) -> str:
try:
return wikipedia.summary(query, sentences=5)
except wikipedia.exceptions.PageError:
return f"No Wikipedia page found for '{query}'."
except wikipedia.exceptions.DisambiguationError as e:
return f"Ambiguous query '{query}'. Options: {', '.join(e.options[:3])}"
except Exception as e:
return f"An error occurred during Wikipedia search: {e}"
# --- Tool Interfaces (for the agent) ---
@tool
def get_webpage_content(url: str) -> str:
"""
Extracts the text content from a single webpage.
Args:
url (str): The full URL of the webpage to read.
"""
return _get_webpage_content_implementation(url)
@tool
def get_youtube_transcript(video_url: str) -> str:
"""
Fetches the full transcript of a YouTube video as a single string.
Args:
video_url (str): The full URL of the YouTube video.
"""
return _get_youtube_transcript_implementation(video_url)
@tool
def wikipedia_search(query: str) -> str:
"""
Searches Wikipedia for a given query and returns a summary.
Args:
query (str): The term or question to search for on Wikipedia.
"""
return _wikipedia_search_implementation(query)
def initialize_agent():
"""Initializes the enhanced multi-disciplinary agent for the GAIA benchmark."""
configure_logging()
logging.info("๐ Starting GAIA agent initialization...")
try:
api_keys = get_api_keys_from_env()
except ValueError as e:
logging.error(f"FATAL: {e}")
return None
try:
model = InferenceClientModel(model_id="Qwen/Qwen3-235B-A22B-FP8", token=api_keys['together'], provider="together")
logging.info("โ
Primary model dQwen/Qwen3-235B-A22B-FP8 loaded successfully")
except Exception as e:
logging.warning(f"โ ๏ธ Failed to load primary model, falling back. Error: {e}")
model = InferenceClientModel(model_id="Qwen/Qwen2.5-7B-Instruct", token=api_keys['together'], provider="together")
logging.info("โ
Fallback model (Qwen 2.5 7B) loaded successfully")
google_search_tool = GoogleSearchTool() if api_keys['serpapi'] else None
tools_list = [
tool for tool in [
google_search_tool,
get_webpage_content,
get_youtube_transcript,
wikipedia_search
] if tool
]
agent = CodeAgent(
model=model,
tools=tools_list,
instructions="""You are a master AI assistant for the GAIA benchmark. Your goal is to provide a single, precise, and final answer by writing and executing Python code.
**STRATEGY:**
You have a powerful toolkit. You can write and execute any Python code you need. You also have access to pre-defined tools that you can call from within your code to gather information.
1. **Analyze**: Break down the user's question into logical steps.
2. **Plan**: Decide if you need to search the web, read a webpage, get a video transcript, or perform a calculation.
3. **Execute**: Write a Python script to perform the steps. You must always use the `...
` format to wrap your code.
**HOW TO USE TOOLS IN YOUR CODE:**
To solve a problem, you will write a Python code block that calls the necessary tools.
*Example 1: Simple Calculation*
Thought: The user wants to know 15! / (12! * 3!). I will use the math library to calculate the factorials and then perform the division.
import math
result = math.factorial(15) / (math.factorial(12) * math.factorial(3))
print(int(result))
*Example 2: Multi-step question involving web search and reading a page*
Thought: I need to find the name of the journal that published a specific article. First, I will use the Google Search tool to find the webpage for the article. Then, I will use the `get_webpage_content` tool to read the text of that page. Finally, I will analyze the text to find the journal's name and print it.
# First, find the URL of the paper.
search_results = GoogleSearchTool(query="A Rapid and Sensitive Method for the Quantitation of Microgram Quantities of Protein Utilizing the Principle of Protein-Dye Binding")
# Let's assume the first result has a good URL, like "https://www.sciencedirect.com/science/article/pii/0003269776905271"
# Now, read the content of that page to find the journal name.
page_content = get_webpage_content(url="https://www.sciencedirect.com/science/article/pii/0003269776905271")
# Now I will analyze the text `page_content` in my head to find the journal name.
# After reading the text, I found the journal is "Analytical Biochemistry".
print("Analytical Biochemistry")
**CRITICAL INSTRUCTION:** You MUST end your entire response with the line `FINAL ANSWER: [Your Final Answer]`. This is the only part of your response that will be graded. Adhere to strict formatting: no extra words, no currency symbols, no commas in numbers.
"""
)
logging.info("๐ฏ GAIA agent with unified CodeAgent architecture initialized successfully!")
return create_gaia_agent_wrapper(agent)
# --- Main Execution Block for Local Testing ---
def main():
"""
Tests the agent with sample GAIA-style questions.
For local testing, ensure you have set the required environment variables:
export TOGETHER_API_KEY="your_key"
export SERPAPI_API_KEY="your_key"
"""
configure_logging()
logging.info("๐งช Starting local agent testing...")
agent = initialize_agent()
if not agent:
logging.critical("๐ฅ Agent initialization failed. Exiting.")
return
test_questions = [
"What is 15! / (12! * 3!)?",
"In what year was the Python programming language first released?",
"What is the square root of 2025?",
]
for i, question in enumerate(test_questions, 1):
logging.info(f"\n{'='*60}\n๐ Test Question {i}: {question}\n{'='*60}")
start_time = time.time()
final_answer = agent(question)
elapsed_time = time.time() - start_time
logging.info(f"โ
Submitted Answer: {final_answer}")
logging.info(f"โฑ๏ธ Execution time: {elapsed_time:.2f} seconds")
time.sleep(1)
logging.info(f"\n{'='*60}\n๐ Testing complete!\n{'='*60}")
if __name__ == "__main__":
main()