|
import os |
|
import gradio as gr |
|
import requests |
|
import pandas as pd |
|
from langchain.agents import AgentExecutor, create_react_agent |
|
from langchain_huggingface import HuggingFaceHub |
|
from langchain_core.prompts import PromptTemplate |
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
from langchain.tools import Tool |
|
from langchain_community.tools import PythonREPLTool |
|
import tempfile |
|
import base64 |
|
from huggingface_hub import InferenceClient |
|
|
|
|
|
try: |
|
from langchain_community.document_loaders import PyPDFLoader |
|
import openpyxl |
|
except ImportError: |
|
pass |
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
class BasicAgent: |
|
def __init__(self, api_url): |
|
print("Advanced BasicAgent initialized with free HF models (no API key).") |
|
|
|
|
|
self.llm = HuggingFaceHub(repo_id="mistralai/Mistral-7B-Instruct-v0.2", model_kwargs={"temperature": 0.1, "max_new_tokens": 500}) |
|
|
|
|
|
self.vision_client = InferenceClient(model="Salesforce/blip-image-captioning-large") |
|
|
|
|
|
self.search_tool = DuckDuckGoSearchRun(name="web_search", description="Search the web for information.") |
|
self.python_tool = PythonREPLTool(description="Execute Python code for calculations or data processing. Input should be valid Python code.") |
|
|
|
|
|
self.file_tool = Tool( |
|
name="process_file", |
|
func=self._process_file, |
|
description="Download and process a file associated with a task. Input format: 'task_id: <id>, file_name: <name>'" |
|
) |
|
|
|
self.tools = [self.search_tool, self.python_tool, self.file_tool] |
|
|
|
|
|
self.prompt_template = PromptTemplate.from_template(""" |
|
You are an expert AI agent solving GAIA benchmark questions. These questions require reasoning, tool use, and sometimes file processing. |
|
|
|
Question: {question} |
|
|
|
If the question mentions a file or attachment, use the 'process_file' tool with 'task_id: {task_id}, file_name: {file_name}'. |
|
|
|
Reason step-by-step using tools as needed. Output ONLY the final answer in the exact format required by the question. No explanations, no extra text. |
|
|
|
{agent_scratchpad} |
|
""") |
|
|
|
self.agent = create_react_agent(self.llm, self.tools, self.prompt_template) |
|
self.executor = AgentExecutor(agent=self.agent, tools=self.tools, verbose=True, handle_parsing_errors=True, max_iterations=10) |
|
|
|
self.api_url = api_url |
|
|
|
def _process_file(self, input_str: str) -> str: |
|
"""Internal function to download and process files without keys.""" |
|
try: |
|
|
|
parts = dict(part.strip().split(': ', 1) for part in input_str.split(', ')) |
|
task_id = parts.get('task_id') |
|
file_name = parts.get('file_name') |
|
if not task_id or not file_name: |
|
return "Invalid input for process_file. Need 'task_id' and 'file_name'." |
|
|
|
|
|
file_url = f"{self.api_url}/files/{task_id}" |
|
response = requests.get(file_url, timeout=10) |
|
response.raise_for_status() |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file_name)[1]) as tmp: |
|
tmp.write(response.content) |
|
file_path = tmp.name |
|
|
|
ext = os.path.splitext(file_name)[1].lower() |
|
|
|
if ext in ['.jpg', '.png', '.jpeg', '.gif']: |
|
|
|
with open(file_path, "rb") as img_file: |
|
description = self.vision_client.image_to_text(image=img_file) |
|
os.unlink(file_path) |
|
return description |
|
|
|
elif ext == '.pdf': |
|
loader = PyPDFLoader(file_path) |
|
docs = loader.load() |
|
text = "\n\n".join(doc.page_content for doc in docs) |
|
os.unlink(file_path) |
|
return text[:20000] |
|
|
|
elif ext in ['.xlsx', '.xls']: |
|
import pandas as pd |
|
df = pd.read_excel(file_path) |
|
os.unlink(file_path) |
|
return df.to_string() |
|
|
|
else: |
|
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
|
text = f.read() |
|
os.unlink(file_path) |
|
return text[:20000] |
|
|
|
except Exception as e: |
|
return f"Error processing file: {str(e)}" |
|
|
|
def __call__(self, question: str, task_id: str, file_name: str | None = None) -> str: |
|
print(f"Agent processing question (first 50 chars): {question[:50]}... (task_id: {task_id}, file: {file_name})") |
|
input_prompt = question |
|
if file_name: |
|
input_prompt += f"\nThere is an attached file '{file_name}'. Use the 'process_file' tool with 'task_id: {task_id}, file_name: {file_name}' to access it." |
|
|
|
try: |
|
response = self.executor.invoke({"question": input_prompt}) |
|
answer = response['output'].strip() |
|
print(f"Agent returning answer: {answer}") |
|
return answer |
|
except Exception as e: |
|
print(f"Error generating answer: {e}") |
|
return "Agent error occurred." |
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
space_id = os.getenv("SPACE_ID") |
|
if profile: |
|
username = profile.username |
|
print(f"User logged in: {username}") |
|
else: |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
try: |
|
agent = BasicAgent(api_url) |
|
except Exception as e: |
|
return f"Error initializing agent: {e}", None |
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except Exception as e: |
|
return f"Error fetching questions: {e}", None |
|
|
|
results_log = [] |
|
answers_payload = [] |
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
file_name = item.get("file_name") |
|
if not task_id or not question_text: |
|
continue |
|
try: |
|
submitted_answer = agent(question_text, task_id, file_name) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
except Exception as e: |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
if not answers_payload: |
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
return final_status, pd.DataFrame(results_log) |
|
except Exception as e: |
|
return f"Submission Failed: {e}", pd.DataFrame(results_log) |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Advanced Agent Evaluation Runner for GAIA (No API Key Required)") |
|
gr.Markdown( |
|
""" |
|
**Instructions:** |
|
1. Log in to Hugging Face. |
|
2. Click 'Run Evaluation & Submit All Answers'. |
|
|
|
This agent uses free Hugging Face models (Mistral for text, BLIP for vision) with tools for search, code, and files to aim for 30-50%+ scores without any API keys. |
|
""" |
|
) |
|
gr.LoginButton() |
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(debug=True, share=False) |