Spaces:
Running
Running
import os | |
import time | |
import logging | |
import re | |
from functools import lru_cache, wraps | |
from typing import Optional, Dict | |
from requests.exceptions import RequestException | |
import wikipedia | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
from llama_index.readers.web import BeautifulSoupWebReader | |
from smolagents import ( | |
CodeAgent, | |
InferenceClientModel, | |
GoogleSearchTool, | |
tool, | |
Tool, | |
) | |
# --- Configuration and Setup --- | |
def configure_logging(): | |
"""Sets up detailed logging configuration.""" | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") | |
def get_api_keys_from_env() -> Dict[str, Optional[str]]: | |
"""Retrieves API keys directly from environment variables.""" | |
keys = { | |
'together': os.getenv('TOGETHER_API_KEY'), | |
'serpapi': os.getenv('SERPAPI_API_KEY') | |
} | |
if not keys['together']: | |
raise ValueError("TOGETHER_API_KEY is required but not found in environment variables.") | |
return keys | |
# --- Custom Exceptions --- | |
class SerpApiClientException(Exception): pass | |
class YouTubeTranscriptApiError(Exception): pass | |
# --- Decorators --- | |
def retry(max_retries=3, initial_delay=1, backoff=2): | |
"""A robust retry decorator with exponential backoff.""" | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
for attempt in range(1, max_retries + 1): | |
try: | |
return func(*args, **kwargs) | |
except (RequestException, SerpApiClientException, YouTubeTranscriptApiError, TranscriptsDisabled, NoTranscriptFound) as e: | |
if attempt == max_retries: | |
logging.error(f"{func.__name__} failed after {attempt} attempts: {e}") | |
return f"Tool Error: {func.__name__} failed after {max_retries} attempts. Details: {e}" | |
time.sleep(initial_delay * (backoff ** (attempt - 1))) | |
except Exception as e: | |
logging.error(f"{func.__name__} failed with a non-retryable error: {e}") | |
return f"Tool Error: A non-retryable error occurred in {func.__name__}: {e}" | |
return wrapper | |
return decorator | |
# --- Answer Formatting and Extraction --- | |
def extract_final_answer(response: str) -> str: | |
"""Extracts the final answer from the agent's full response string.""" | |
if not response: return "" | |
match = re.search(r'FINAL\s+ANSWER\s*:\s*(.*)', response, re.IGNORECASE | re.DOTALL) | |
if match: return match.group(1).strip() | |
lines = response.strip().split('\n') | |
return lines[-1].strip() if lines else "" | |
def normalize_answer_format(answer: str) -> str: | |
"""Normalizes the extracted answer to meet strict GAIA formatting requirements.""" | |
if not answer: return "" | |
answer = answer.strip().rstrip('.') | |
is_list = ',' in answer and len(answer.split(',')) > 1 | |
try: | |
is_numeric = not is_list and float(answer.replace(',', '')) is not None | |
except ValueError: | |
is_numeric = False | |
if is_numeric: return re.sub(r'[,$%]', '', answer).strip() | |
if is_list: | |
elements = [normalize_answer_format(elem.strip()) for elem in answer.split(',')] | |
return ', '.join(elements) | |
return answer | |
# --- Agent Wrapper for GAIA Compliance --- | |
def create_gaia_agent_wrapper(agent: CodeAgent): | |
"""Creates a callable wrapper around the agent to enforce GAIA answer formatting.""" | |
def gaia_compliant_agent(question: str) -> str: | |
logging.info(f"Received question for GAIA compliant agent: '{question}'") | |
full_response = agent.run(question) | |
logging.info(f"Agent raw response:\n---\n{full_response}\n---") | |
final_answer = extract_final_answer(full_response) | |
normalized_answer = normalize_answer_format(final_answer) | |
logging.info(f"Normalized answer for submission: '{normalized_answer}'") | |
return normalized_answer | |
return gaia_compliant_agent | |
# --- Tool Implementations (with robustness decorators) --- | |
def _get_webpage_content_implementation(url: str) -> str: | |
logging.info(f"π Reading webpage content from: {url}") | |
loader = BeautifulSoupWebReader() | |
docs = loader.load_data(urls=[url]) | |
if not docs or not docs[0].text: | |
raise ValueError(f"No content could be extracted from {url}") | |
return docs[0].text[:15000] | |
def _get_youtube_transcript_implementation(video_url: str) -> str: | |
logging.info(f"π¬ Fetching YouTube transcript for: {video_url}") | |
video_id_match = re.search(r'(?:v=|\/)([a-zA-Z0-9_-]{11}).*', video_url) | |
if not video_id_match: | |
return "Error: Invalid YouTube URL provided." | |
video_id = video_id_match.group(1) | |
try: | |
transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
transcript_text = ' '.join([t['text'] for t in transcript_list]) | |
return transcript_text[:15000] | |
except (TranscriptsDisabled, NoTranscriptFound) as e: | |
logging.error(f"Could not retrieve transcript for {video_url}: {e}") | |
raise YouTubeTranscriptApiError(f"Transcript not available for video {video_id}.") from e | |
def _wikipedia_search_implementation(query: str) -> str: | |
try: | |
return wikipedia.summary(query, sentences=5) | |
except wikipedia.exceptions.PageError: | |
return f"No Wikipedia page found for '{query}'." | |
except wikipedia.exceptions.DisambiguationError as e: | |
return f"Ambiguous query '{query}'. Options: {', '.join(e.options[:3])}" | |
except Exception as e: | |
return f"An error occurred during Wikipedia search: {e}" | |
# --- Tool Interfaces (for the agent) --- | |
def get_webpage_content(url: str) -> str: | |
""" | |
Extracts the text content from a single webpage. | |
Args: | |
url (str): The full URL of the webpage to read. | |
""" | |
return _get_webpage_content_implementation(url) | |
def get_youtube_transcript(video_url: str) -> str: | |
""" | |
Fetches the full transcript of a YouTube video as a single string. | |
Args: | |
video_url (str): The full URL of the YouTube video. | |
""" | |
return _get_youtube_transcript_implementation(video_url) | |
def wikipedia_search(query: str) -> str: | |
""" | |
Searches Wikipedia for a given query and returns a summary. | |
Args: | |
query (str): The term or question to search for on Wikipedia. | |
""" | |
return _wikipedia_search_implementation(query) | |
def initialize_agent(): | |
"""Initializes the enhanced multi-disciplinary agent for the GAIA benchmark.""" | |
configure_logging() | |
logging.info("π Starting GAIA agent initialization...") | |
try: | |
api_keys = get_api_keys_from_env() | |
except ValueError as e: | |
logging.error(f"FATAL: {e}") | |
return None | |
try: | |
model = InferenceClientModel(model_id="Qwen/Qwen3-235B-A22B-FP8", token=api_keys['together'], provider="together") | |
logging.info("β Primary model dQwen/Qwen3-235B-A22B-FP8 loaded successfully") | |
except Exception as e: | |
logging.warning(f"β οΈ Failed to load primary model, falling back. Error: {e}") | |
model = InferenceClientModel(model_id="Qwen/Qwen2.5-7B-Instruct", token=api_keys['together'], provider="together") | |
logging.info("β Fallback model (Qwen 2.5 7B) loaded successfully") | |
google_search_tool = GoogleSearchTool() if api_keys['serpapi'] else None | |
tools_list = [ | |
tool for tool in [ | |
google_search_tool, | |
get_webpage_content, | |
get_youtube_transcript, | |
wikipedia_search | |
] if tool | |
] | |
agent = CodeAgent( | |
model=model, | |
tools=tools_list, | |
instructions="""You are a master AI assistant for the GAIA benchmark. Your goal is to provide a single, precise, and final answer by writing and executing Python code. | |
**STRATEGY:** | |
You have a powerful toolkit. You can write and execute any Python code you need. You also have access to pre-defined tools that you can call from within your code to gather information. | |
1. **Analyze**: Break down the user's question into logical steps. | |
2. **Plan**: Decide if you need to search the web, read a webpage, get a video transcript, or perform a calculation. | |
3. **Execute**: Write a Python script to perform the steps. You must always use the `<code>...</code>` format to wrap your code. | |
**HOW TO USE TOOLS IN YOUR CODE:** | |
To solve a problem, you will write a Python code block that calls the necessary tools. | |
*Example 1: Simple Calculation* | |
Thought: The user wants to know 15! / (12! * 3!). I will use the math library to calculate the factorials and then perform the division. | |
<code> | |
import math | |
result = math.factorial(15) / (math.factorial(12) * math.factorial(3)) | |
print(int(result)) | |
</code> | |
*Example 2: Multi-step question involving web search and reading a page* | |
Thought: I need to find the name of the journal that published a specific article. First, I will use the Google Search tool to find the webpage for the article. Then, I will use the `get_webpage_content` tool to read the text of that page. Finally, I will analyze the text to find the journal's name and print it. | |
<code> | |
# First, find the URL of the paper. | |
search_results = GoogleSearchTool(query="A Rapid and Sensitive Method for the Quantitation of Microgram Quantities of Protein Utilizing the Principle of Protein-Dye Binding") | |
# Let's assume the first result has a good URL, like "https://www.sciencedirect.com/science/article/pii/0003269776905271" | |
# Now, read the content of that page to find the journal name. | |
page_content = get_webpage_content(url="https://www.sciencedirect.com/science/article/pii/0003269776905271") | |
# Now I will analyze the text `page_content` in my head to find the journal name. | |
# After reading the text, I found the journal is "Analytical Biochemistry". | |
print("Analytical Biochemistry") | |
</code> | |
**CRITICAL INSTRUCTION:** You MUST end your entire response with the line `FINAL ANSWER: [Your Final Answer]`. This is the only part of your response that will be graded. Adhere to strict formatting: no extra words, no currency symbols, no commas in numbers. | |
""" | |
) | |
logging.info("π― GAIA agent with unified CodeAgent architecture initialized successfully!") | |
return create_gaia_agent_wrapper(agent) | |
# --- Main Execution Block for Local Testing --- | |
def main(): | |
""" | |
Tests the agent with sample GAIA-style questions. | |
For local testing, ensure you have set the required environment variables: | |
export TOGETHER_API_KEY="your_key" | |
export SERPAPI_API_KEY="your_key" | |
""" | |
configure_logging() | |
logging.info("π§ͺ Starting local agent testing...") | |
agent = initialize_agent() | |
if not agent: | |
logging.critical("π₯ Agent initialization failed. Exiting.") | |
return | |
test_questions = [ | |
"What is 15! / (12! * 3!)?", | |
"In what year was the Python programming language first released?", | |
"What is the square root of 2025?", | |
] | |
for i, question in enumerate(test_questions, 1): | |
logging.info(f"\n{'='*60}\nπ Test Question {i}: {question}\n{'='*60}") | |
start_time = time.time() | |
final_answer = agent(question) | |
elapsed_time = time.time() - start_time | |
logging.info(f"β Submitted Answer: {final_answer}") | |
logging.info(f"β±οΈ Execution time: {elapsed_time:.2f} seconds") | |
time.sleep(1) | |
logging.info(f"\n{'='*60}\nπ Testing complete!\n{'='*60}") | |
if __name__ == "__main__": | |
main() |