Clémentine
slight reorg
80d548a
raw
history blame
7.67 kB
from huggingface_hub import run_job, inspect_job, fetch_job_logs
import os
import re
import time
from datetime import datetime
import globals
from utils.io import save_results, load_models_providers
from typing import Optional
def extract_score_from_job(job_id: str) -> Optional[float]:
"""Extract average score from completed job logs.
Parses the results table and calculates the average of the main metric
for each task (the metric on the same line as the task name).
"""
try:
# Inspect the job to get details and logs
logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE)
scores = []
for line in logs:
# Find the results table
# Look for lines that match the pattern: |task_name|version|metric|value|...|
# We want to extract the score (value) from lines where the task name is not empty
if '|' in line:
parts = [p.strip() for p in line.split('|')]
# Skip header and separator lines
# Table format: | Task | Version | Metric | Value | | Stderr |
if len(parts) == 8:
_, task, _, metric, value, _, _, _ = parts
# Is the task name correct
if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]:
# Try to extract numeric value
# Remove any extra characters and convert to float
score = float(value)
scores.append(score)
print(f"Extracted score {score} for task '{task}' metric '{metric}'")
# Calculate average of all task scores
if scores:
average_score = sum(scores) / len(scores)
print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks")
return average_score
else:
print("No scores found in job logs")
return None
except Exception as e:
print(f"Error extracting score for job {job_id}: {e}")
import traceback
traceback.print_exc()
return None
def run_single_job(model: str, provider: str, tasks: str = globals.TASKS) -> Optional[str]:
"""Run a single job for a model-provider combination."""
if not model or not provider:
print("Missing model or provider")
return -1
# Verify the model-provider combination exists in the config
models_providers = load_models_providers(globals.LOCAL_CONFIG_FILE)
if (model, provider) not in models_providers:
print( f"Error: {model} with {provider} not found in {globals.LOCAL_CONFIG_FILE}")
return -1
# Check if job is already running
key = globals.get_model_provider_key(model, provider)
if key in globals.job_results:
current_status = globals.job_results[key].get("status")
if current_status == "RUNNING":
print( f"Job for {model} on {provider} is already running. Please wait for it to complete.")
return -1
print(f"Starting job for model={model}, provider={provider}")
job = run_job(
image="hf.co/spaces/OpenEvals/EvalsOnTheHub",
command=[
"lighteval", "endpoint", "inference-providers",
f"model_name={model},provider={provider}",
tasks,
"--push-to-hub", "--save-details",
"--results-org", "IPTesting",
"--max-samples", "10"
],
namespace=globals.NAMESPACE,
secrets={"HF_TOKEN": os.getenv("HF_TOKEN")},
token=os.getenv("HF_TOKEN")
)
job_id = job.id
key = globals.get_model_provider_key(model, provider)
with globals.results_lock:
# Move current score to previous score if it exists (relaunching)
previous_score = None
if key in globals.job_results and globals.job_results[key].get("current_score", None) is not None:
previous_score = globals.job_results[key]["current_score"]
globals.job_results[key] = {
"model": model,
"provider": provider,
"last_run": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"status": "RUNNING",
"current_score": None,
"previous_score": previous_score,
"job_id": job_id
}
save_results()
print(f"Job launched: ID={job_id}, model={model}, provider={provider}")
return job_id
# Todo: factorize both following functions
def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE):
"""Launch jobs for all models and providers."""
models_providers = load_models_providers(config_file)
if not models_providers:
print("No valid model-provider combinations found in config file")
return "No valid model-provider combinations found"
print(f"Found {len(models_providers)} model-provider combinations")
launched_count = 0
for model, provider in models_providers:
job_id = run_single_job(model, provider, tasks)
if job_id != -1:
launched_count += 1
# Small delay between launches to avoid rate limiting
time.sleep(2)
print(f"Launched {launched_count}/{len(models_providers)} jobs successfully")
return f"Launched {launched_count} jobs"
def relaunch_failed_jobs():
"""Relaunch only failed model-provider combinations from job results."""
if not globals.job_results:
return "No existing jobs to relaunch"
failed_jobs = [(key, info) for key, info in globals.job_results.items()
if info.get("status") in ["ERROR", "FAILED"]]
if not failed_jobs:
return "No failed jobs to relaunch"
relaunched_count = 0
for key, info in failed_jobs:
model = info["model"]
provider = info["provider"]
job_id = run_single_job(model, provider, globals.TASKS)
if job_id != -1:
relaunched_count += 1
time.sleep(2) # Small delay between launches to avoid rate limiting
return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs"
def update_job_statuses() -> None:
"""Check and update the status of active jobs."""
try:
keys = list(globals.job_results.keys())
for key in keys:
try:
job_id = globals.job_results[key]["job_id"]
job_info = inspect_job(job_id=job_id, namespace=globals.NAMESPACE)
new_status = job_info.status.stage
with globals.results_lock:
old_status = globals.job_results[key]["status"]
if old_status != new_status:
globals.job_results[key]["status"] = new_status
print(f"Job {job_id} status changed: {old_status} -> {new_status}")
# If job completed, try to extract score
if new_status == "COMPLETED":
score = extract_score_from_job(job_id)
if score is not None:
globals.job_results[key]["current_score"] = score
if new_status == "COMPLETED" and globals.job_results[key]["current_score"] is None:
score = extract_score_from_job(job_id)
if score is not None:
globals.job_results[key]["current_score"] = score
except Exception as e:
print(f"Error checking job: {str(e)}")
save_results()
except Exception as e:
print(f"Error in update_job_statuses: {str(e)}")