Clémentine
fix duration
71ff8d6
from huggingface_hub import run_job, inspect_job, fetch_job_logs
import os
import re
import time
from datetime import datetime
import globals
from utils.io import save_results, load_models_providers
from typing import Optional
def extract_score_from_job(job_id: str) -> Optional[float]:
"""Extract average score from completed job logs.
Parses the results table and calculates the average of the main metric
for each task (the metric on the same line as the task name).
"""
try:
# Inspect the job to get details and logs
logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE)
scores = []
for line in logs:
# Find the results table
# Look for lines that match the pattern: |task_name|version|metric|value|...|
# We want to extract the score (value) from lines where the task name is not empty
if '|' in line:
parts = [p.strip() for p in line.split('|')]
# Skip header and separator lines
# Table format: | Task | Version | Metric | Value | | Stderr |
if len(parts) == 8:
_, task, _, metric, value, _, _, _ = parts
# Is the task name correct
if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]:
# Try to extract numeric value
# Remove any extra characters and convert to float
score = float(value)
scores.append(score)
print(f"Extracted score {score} for task '{task}' metric '{metric}'")
# Calculate average of all task scores
if scores:
average_score = sum(scores) / len(scores)
print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks")
return average_score
else:
print("No scores found in job logs")
return None
except Exception as e:
print(f"Error extracting score for job {job_id}: {e}")
import traceback
traceback.print_exc()
return None
def run_single_job(model: str, provider: str, tasks: str = globals.TASKS, run_number: int = 1) -> Optional[str]:
"""Run a single job for a model-provider combination.
Args:
model: Model ID
provider: Provider name
tasks: Tasks to run
run_number: Which run this is (1-4 for multiple runs)
"""
if not model or not provider:
print("Missing model or provider")
return -1
# Check if this specific run number is already running for this model-provider
key = globals.get_model_provider_key(model, provider)
if key in globals.job_results:
runs = globals.job_results[key].get("runs", [])
# Check if this specific run number is already running
for run in runs:
if run.get("run_number") == run_number and run.get("status") == "RUNNING":
print(f"Run {run_number} for {model} on {provider} is already running. Please wait for it to complete.")
return -1
print(f"Starting job for model={model}, provider={provider}, run {run_number}/{globals.NUM_RUNS_PER_JOB}")
job = run_job(
image="hf.co/spaces/OpenEvals/EvalsOnTheHub",
command=[
"lighteval", "endpoint", "inference-providers",
f"model_name={model},provider={provider}",
tasks,
"--push-to-hub", "--save-details",
"--results-org", "IPTesting",
],
namespace=globals.NAMESPACE,
secrets={"HF_TOKEN": os.getenv("HF_TOKEN")},
token=os.getenv("HF_TOKEN")
)
job_id = job.id
start_time = datetime.now()
with globals.results_lock:
# Initialize or update the job result
if key not in globals.job_results:
# First run - initialize the structure
previous_score = None
globals.job_results[key] = {
"model": model,
"provider": provider,
"last_run": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "RUNNING",
"current_score": None,
"previous_score": None,
"job_id": job_id,
"start_time": start_time.isoformat(),
"duration": None,
"completed_at": None,
"runs": []
}
else:
# Subsequent run or relaunch
previous_score = globals.job_results[key].get("current_score")
globals.job_results[key]["status"] = "RUNNING"
globals.job_results[key]["last_run"] = start_time.strftime("%Y-%m-%d %H:%M:%S")
globals.job_results[key]["start_time"] = start_time.isoformat()
globals.job_results[key]["previous_score"] = previous_score
# Add this run to the runs list
globals.job_results[key]["runs"].append({
"run_number": run_number,
"job_id": job_id,
"status": "RUNNING",
"score": None,
"start_time": start_time.isoformat(),
"duration": None,
"completed_at": None
})
# Don't save immediately - let the periodic save handle it
print(f"Job launched: ID={job_id}, model={model}, provider={provider}, run {run_number}")
return job_id
def run_multiple_jobs(model: str, provider: str, tasks: str = globals.TASKS, num_runs: int = globals.NUM_RUNS_PER_JOB) -> list:
"""Run multiple jobs for a model-provider combination to reduce variance.
Returns:
List of job IDs launched
"""
job_ids = []
for run_number in range(1, num_runs + 1):
job_id = run_single_job(model, provider, tasks, run_number=run_number)
if job_id != -1:
job_ids.append(job_id)
return job_ids
# Todo: factorize both following functions
def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE):
"""Launch jobs for all models and providers with multiple runs per combination."""
models_providers = load_models_providers(config_file)
if not models_providers:
print("No valid model-provider combinations found in config file")
return "No valid model-provider combinations found"
print(f"Found {len(models_providers)} model-provider combinations")
print(f"Will launch {globals.NUM_RUNS_PER_JOB} runs per combination")
launched_count = 0
for model, provider in models_providers:
job_ids = run_multiple_jobs(model, provider, tasks)
if job_ids:
launched_count += len(job_ids)
# Save all results once after launching all jobs
save_results()
total_expected = len(models_providers) * globals.NUM_RUNS_PER_JOB
print(f"Launched {launched_count}/{total_expected} jobs successfully")
return f"Launched {launched_count}/{total_expected} jobs ({len(models_providers)} model-provider combinations × {globals.NUM_RUNS_PER_JOB} runs each)"
def relaunch_failed_jobs():
"""Relaunch only failed model-provider combinations from job results."""
if not globals.job_results:
return "No existing jobs to relaunch"
failed_jobs = [(key, info) for key, info in globals.job_results.items()
if info.get("status") in ["ERROR", "FAILED"]]
if not failed_jobs:
return "No failed jobs to relaunch"
relaunched_count = 0
for key, info in failed_jobs:
model = info["model"]
provider = info["provider"]
job_id = run_single_job(model, provider, globals.TASKS)
if job_id != -1:
relaunched_count += 1
# Save all results once after relaunching all failed jobs
save_results()
return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs"
def update_job_statuses() -> None:
"""Check and update the status of active jobs and aggregate scores from multiple runs."""
try:
keys = list(globals.job_results.keys())
for key in keys:
try:
with globals.results_lock:
runs = globals.job_results[key].get("runs", [])
if not runs:
# Legacy format - no runs list
continue
# Check status of each run
all_completed = True
all_failed = True
any_running = False
for run in runs:
if run["status"] == "RUNNING":
# Check if this run's job is still running
try:
job_info = inspect_job(job_id=run["job_id"], namespace=globals.NAMESPACE)
new_status = job_info.status.stage
if run["status"] != new_status:
run["status"] = new_status
print(f"Run {run['run_number']} job {run['job_id']} status changed: {run['status']} -> {new_status}")
if new_status == "COMPLETED":
completed_time = datetime.now()
run["completed_at"] = completed_time.strftime("%Y-%m-%d %H:%M:%S")
# Calculate duration
if run.get("start_time"):
start_time = datetime.fromisoformat(run["start_time"])
run["duration"] = (completed_time - start_time).total_seconds()
# Extract score
score = extract_score_from_job(run["job_id"])
if score is not None:
run["score"] = score
print(f"Run {run['run_number']}: extracted score {score:.4f}")
except Exception as e:
print(f"Error checking run {run['run_number']}: {e}")
# Update aggregate status flags
if run["status"] == "RUNNING":
any_running = True
all_completed = False
all_failed = False
elif run["status"] == "COMPLETED":
all_failed = False
elif run["status"] in ["ERROR", "FAILED"]:
all_completed = False
# Update overall status
if any_running:
globals.job_results[key]["status"] = "RUNNING"
elif all_completed:
globals.job_results[key]["status"] = "COMPLETED"
# Calculate aggregate statistics from completed runs
completed_scores = [run["score"] for run in runs if run["status"] == "COMPLETED" and run["score"] is not None]
completed_durations = [run["duration"] for run in runs if run["status"] == "COMPLETED" and run.get("duration") is not None]
if completed_scores:
import statistics
mean_score = statistics.mean(completed_scores)
variance = statistics.variance(completed_scores) if len(completed_scores) > 1 else 0.0
globals.job_results[key]["current_score"] = mean_score
globals.job_results[key]["score_variance"] = variance
print(f"Aggregated {len(completed_scores)} runs: mean={mean_score:.4f}, variance={variance:.6f}")
# Calculate average duration
if completed_durations:
mean_duration = statistics.mean(completed_durations)
globals.job_results[key]["duration"] = mean_duration
print(f"Average duration: {mean_duration:.2f} seconds")
# Update completion time to latest run
latest_completion = max([run["completed_at"] for run in runs if run.get("completed_at")], default=None)
if latest_completion:
globals.job_results[key]["completed_at"] = latest_completion
elif all_failed:
globals.job_results[key]["status"] = "ERROR"
except Exception as e:
print(f"Error checking job: {str(e)}")
import traceback
traceback.print_exc()
save_results()
except Exception as e:
print(f"Error in update_job_statuses: {str(e)}")
import traceback
traceback.print_exc()