Spaces:
Sleeping
Sleeping
File size: 7,671 Bytes
8dafde0 7f5506e 096bf86 7f5506e 8dafde0 7f5506e 8dafde0 7f5506e 8dafde0 7f5506e 32de2c3 7f5506e 32de2c3 7f5506e 8580b64 7f5506e 6e99f98 7f5506e 510a6cc 7f5506e 80d548a 7f5506e 80d548a 7f5506e 32de2c3 7f5506e 32de2c3 7f5506e a791654 7f5506e 8dafde0 7f5506e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
from huggingface_hub import run_job, inspect_job, fetch_job_logs
import os
import re
import time
from datetime import datetime
import globals
from utils.io import save_results, load_models_providers
from typing import Optional
def extract_score_from_job(job_id: str) -> Optional[float]:
"""Extract average score from completed job logs.
Parses the results table and calculates the average of the main metric
for each task (the metric on the same line as the task name).
"""
try:
# Inspect the job to get details and logs
logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE)
scores = []
for line in logs:
# Find the results table
# Look for lines that match the pattern: |task_name|version|metric|value|...|
# We want to extract the score (value) from lines where the task name is not empty
if '|' in line:
parts = [p.strip() for p in line.split('|')]
# Skip header and separator lines
# Table format: | Task | Version | Metric | Value | | Stderr |
if len(parts) == 8:
_, task, _, metric, value, _, _, _ = parts
# Is the task name correct
if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]:
# Try to extract numeric value
# Remove any extra characters and convert to float
score = float(value)
scores.append(score)
print(f"Extracted score {score} for task '{task}' metric '{metric}'")
# Calculate average of all task scores
if scores:
average_score = sum(scores) / len(scores)
print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks")
return average_score
else:
print("No scores found in job logs")
return None
except Exception as e:
print(f"Error extracting score for job {job_id}: {e}")
import traceback
traceback.print_exc()
return None
def run_single_job(model: str, provider: str, tasks: str = globals.TASKS) -> Optional[str]:
"""Run a single job for a model-provider combination."""
if not model or not provider:
print("Missing model or provider")
return -1
# Verify the model-provider combination exists in the config
models_providers = load_models_providers(globals.LOCAL_CONFIG_FILE)
if (model, provider) not in models_providers:
print( f"Error: {model} with {provider} not found in {globals.LOCAL_CONFIG_FILE}")
return -1
# Check if job is already running
key = globals.get_model_provider_key(model, provider)
if key in globals.job_results:
current_status = globals.job_results[key].get("status")
if current_status == "RUNNING":
print( f"Job for {model} on {provider} is already running. Please wait for it to complete.")
return -1
print(f"Starting job for model={model}, provider={provider}")
job = run_job(
image="hf.co/spaces/OpenEvals/EvalsOnTheHub",
command=[
"lighteval", "endpoint", "inference-providers",
f"model_name={model},provider={provider}",
tasks,
"--push-to-hub", "--save-details",
"--results-org", "IPTesting",
"--max-samples", "10"
],
namespace=globals.NAMESPACE,
secrets={"HF_TOKEN": os.getenv("HF_TOKEN")},
token=os.getenv("HF_TOKEN")
)
job_id = job.id
key = globals.get_model_provider_key(model, provider)
with globals.results_lock:
# Move current score to previous score if it exists (relaunching)
previous_score = None
if key in globals.job_results and globals.job_results[key].get("current_score", None) is not None:
previous_score = globals.job_results[key]["current_score"]
globals.job_results[key] = {
"model": model,
"provider": provider,
"last_run": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"status": "RUNNING",
"current_score": None,
"previous_score": previous_score,
"job_id": job_id
}
save_results()
print(f"Job launched: ID={job_id}, model={model}, provider={provider}")
return job_id
# Todo: factorize both following functions
def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE):
"""Launch jobs for all models and providers."""
models_providers = load_models_providers(config_file)
if not models_providers:
print("No valid model-provider combinations found in config file")
return "No valid model-provider combinations found"
print(f"Found {len(models_providers)} model-provider combinations")
launched_count = 0
for model, provider in models_providers:
job_id = run_single_job(model, provider, tasks)
if job_id != -1:
launched_count += 1
# Small delay between launches to avoid rate limiting
time.sleep(2)
print(f"Launched {launched_count}/{len(models_providers)} jobs successfully")
return f"Launched {launched_count} jobs"
def relaunch_failed_jobs():
"""Relaunch only failed model-provider combinations from job results."""
if not globals.job_results:
return "No existing jobs to relaunch"
failed_jobs = [(key, info) for key, info in globals.job_results.items()
if info.get("status") in ["ERROR", "FAILED"]]
if not failed_jobs:
return "No failed jobs to relaunch"
relaunched_count = 0
for key, info in failed_jobs:
model = info["model"]
provider = info["provider"]
job_id = run_single_job(model, provider, globals.TASKS)
if job_id != -1:
relaunched_count += 1
time.sleep(2) # Small delay between launches to avoid rate limiting
return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs"
def update_job_statuses() -> None:
"""Check and update the status of active jobs."""
try:
keys = list(globals.job_results.keys())
for key in keys:
try:
job_id = globals.job_results[key]["job_id"]
job_info = inspect_job(job_id=job_id, namespace=globals.NAMESPACE)
new_status = job_info.status.stage
with globals.results_lock:
old_status = globals.job_results[key]["status"]
if old_status != new_status:
globals.job_results[key]["status"] = new_status
print(f"Job {job_id} status changed: {old_status} -> {new_status}")
# If job completed, try to extract score
if new_status == "COMPLETED":
score = extract_score_from_job(job_id)
if score is not None:
globals.job_results[key]["current_score"] = score
if new_status == "COMPLETED" and globals.job_results[key]["current_score"] is None:
score = extract_score_from_job(job_id)
if score is not None:
globals.job_results[key]["current_score"] = score
except Exception as e:
print(f"Error checking job: {str(e)}")
save_results()
except Exception as e:
print(f"Error in update_job_statuses: {str(e)}")
|