Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import requests | |
import inspect | |
import pandas as pd | |
from agents import agents | |
from PIL import Image | |
from io import BytesIO | |
import whisper | |
# (Keep Constants as is) | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# --- Load Agent --- | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
agent = None | |
def select_agent(provider_name:str, model_name: str): | |
""" | |
Selects the agent based on the provided name. | |
:param agent_name: Name of the agent to select. | |
:return: The selected agent instance. | |
""" | |
global agent | |
try: | |
agent = agents.get_agent(model_name=model_name, model_type=provider_name) | |
if agent is None: | |
print(f"Agent not found for provider: {provider_name} and model: {model_name}") | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error selecting agent: {e}") | |
agent = BasicAgent() | |
# Update ui to indicate the selected agent | |
print(f"Agent selected: {agent.model}") | |
agent_info_text.value = get_agent_info() | |
return agent | |
def get_agent_info() -> str: | |
global agent | |
if (agent is None): | |
return "No agent selected." | |
try: | |
# Get the agent's class name | |
agent_class_name = agent.__class__.__name__ | |
# Get the agent's model name | |
model_name = agent.model | |
# Get the agent's docstring | |
docstring = inspect.getdoc(agent) | |
# Format the information | |
info = f"Agent Class: {agent_class_name}\nModel Name: {model_name}\nDocstring: {docstring}" | |
return info | |
except Exception as e: | |
print(f"Error getting agent info: {e}") | |
return "Error getting agent info." | |
# --- Basic Agent Definition --- | |
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
class BasicAgent: | |
def __init__(self): | |
print("BasicAgent initialized.") | |
def __call__(self, question: str) -> str: | |
print(f"Agent received question (first 50 chars): {question[:50]}...") | |
fixed_answer = "This is a default answer." | |
print(f"Agent returning fixed answer: {fixed_answer}") | |
return fixed_answer | |
def get_all_questions(): | |
""" | |
Fetches all available questions from the API. | |
""" | |
yield from run_test_on_questions(False, False) | |
def run_test_on_all_questions(): | |
""" | |
Runs tests on all available questions by forwarding yields from run_test_on_questions. | |
""" | |
yield from run_test_on_questions(False, True) | |
def run_test_on_random_question(): | |
""" | |
Runs a single test on a random available question by forwarding yields from run_test_on_questions. | |
""" | |
yield from run_test_on_questions(True, True) | |
def run_test_on_questions(use_random_question: bool, run_agent:bool): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
global agent | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/random-question" if use_random_question else f"{api_url}/questions" | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
info = "# started request" | |
yield info, None | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_dataset_raw = response.json() | |
questions_dataset = [questions_dataset_raw] if use_random_question else questions_dataset_raw | |
yield info, None | |
if not questions_dataset: | |
print("Fetched questions list is empty.") | |
yield info +"\n\nFetched questions list is empty or invalid format.", None | |
return | |
print(f"Fetched {len(questions_dataset)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
yield f"Error fetching questions: {e}", None | |
return | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
yield f"Error decoding server response for questions: {e}", None | |
return | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
yield f"An unexpected error occurred fetching questions: {e}", None | |
return | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
# loop over all questions | |
for i, questions_data in enumerate(questions_dataset): | |
agent.memory.reset() | |
images = [] | |
task_id = questions_data.get("task_id") | |
question_text = questions_data.get("question") | |
file_name = questions_data.get("file_name") | |
if (file_name != "" and file_name is not None): | |
question_text = question_text + f"\n\nYou can download the correspondig file using the download tool with the task id: {task_id}." | |
fileData = requests.get(f"{DEFAULT_API_URL}/files/{task_id}") | |
# check if file is an image | |
if fileData.headers['Content-Type'] in ['image/png', 'image/jpeg']: | |
image = Image.open(BytesIO(fileData.content)).convert("RGB") | |
images = [image] | |
if fileData.headers['Content-Type'] in ['audio/mpeg', 'audio/wav']: | |
# Load the audio file using Whisper | |
model = whisper.load_model("base") | |
# MP3-Datei von der API abrufen | |
with open("temp_audio.mp3", "wb") as f: | |
f.write(fileData.content) | |
# Transkription durchführen | |
audioContent = model.transcribe("temp_audio.mp3") | |
question_text = question_text + f"\n\nTranscription: {audioContent['text']}" | |
info += f"\n\nRunning agent on question {i+1}/{len(questions_dataset)}:\n - task_id: {task_id}\n - question: {question_text}" | |
yield info, None | |
if not task_id or question_text is None: | |
yield info+ f"\nError in question data: {questions_data}", None | |
return | |
try: | |
submitted_answer = agent.run(question_text, images=images) if run_agent else "-- no agent interaction --" | |
info += f"\n - got answer {submitted_answer}" | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, "FileInfo": file_name}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}", "FileInfo": file_name}) | |
if not answers_payload: | |
print("Agent did not produce any answers.") | |
yield info + "\nAgent did not produce any answers.", pd.DataFrame(results_log) | |
return | |
# 5. Submit | |
try: | |
results_df = pd.DataFrame(results_log) | |
yield info + "\nGot an answer from agent", results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
yield status_message, results_df | |
return | |
def run_and_submit_all( profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
return "We are not there yet", None | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username= f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
def fetch_ollama_models() -> list: | |
""" | |
Fetches available models from the Ollama server. | |
:return: List of available models. | |
""" | |
try: | |
response = requests.get("http://localhost:11434/api/tags") | |
response.raise_for_status() | |
data = response.json() | |
return [model["name"] for model in data["models"]] | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching Ollama models: {e}") | |
return ["None"] | |
def fetch_lmstudio_models() -> list: | |
""" | |
Fetches available models from the LM Studio server. | |
:return: List of available models. | |
""" | |
try: | |
response = requests.get("http://localhost:1234/v1/models") | |
response.raise_for_status() | |
data = response.json() | |
return [model["id"] for model in data["data"]] | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching LM Studio models: {e}") | |
return ["None"] | |
available_models = ["None"] | |
def update_available_models(provider:str): | |
""" | |
Fetches available models based on the selected provider. | |
:param provider: The selected provider name. | |
:return: Update object for the model dropdown. | |
""" | |
global available_models | |
print(f"Selected provider: {provider}") | |
match provider: | |
case "hugging face": | |
available_models = ["None", ""] | |
case "Ollama": | |
available_models = fetch_ollama_models() | |
case "LMStudio": | |
available_models = fetch_lmstudio_models() | |
case "Gemini": | |
available_models = ["None", "Gemini-2.0-flash-exp", "Gemini-2.0-flash-lite"] | |
case "Anthropic": | |
available_models = ["None", "claude-3"] # just for later options, model name possibly wrong | |
case "OpenAI": | |
available_models = ["None", "gpt-4o", "gpt-3.5-turbo"] # just for later options, model name possibly wrong | |
case "Basic Agent": | |
available_models = ["None"] | |
case _: | |
available_models = ["None"] | |
print(f"Available models for {provider}: {available_models}") | |
return gr.Dropdown(choices=available_models) | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
agent_info_text = gr.Text(label="Agent Name", value=get_agent_info(), interactive=False, visible=True) | |
gr.Markdown( | |
""" | |
**Instructions:** | |
Select a provider and then model to generate the agent. | |
""" | |
) | |
provider_select = gr.Dropdown( | |
label="Select Provider", | |
choices=["Basic Agent", "LMStudio", "Ollama", "hugging face", "Gemini", "Anthropic", "OpenAI"], | |
interactive=True, | |
visible=True, | |
multiselect=False) | |
model_select = gr.Dropdown( | |
label="Select Model", | |
choices=available_models, | |
interactive=True, | |
visible=True, | |
multiselect=False) | |
# changing the provider will change the available models | |
provider_select.input(fn=update_available_models, inputs=provider_select, outputs=[model_select]) | |
# changing a model will update the agent (see select_agent) | |
model_select.change(fn=select_agent, inputs=[provider_select, model_select]) | |
# in case of running on HF space, we support the login button | |
# we somehow need to find out, if this is running on HF space or not | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
run_test_button = gr.Button("Run Test on Random Question") | |
run_multiple_tests_button = gr.Button("Run tests on all questions") | |
run_get_questions_button = gr.Button("Get Questions") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
# Removed max_rows=10 from DataFrame constructor | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_test_button.click( | |
fn=run_test_on_random_question, | |
outputs=[status_output, results_table] | |
) | |
run_multiple_tests_button.click( | |
fn=run_test_on_all_questions, | |
outputs=[status_output, results_table] | |
) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
run_get_questions_button.click( | |
fn=get_all_questions, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |