Spaces:
Sleeping
Sleeping
| from huggingface_hub import run_job, inspect_job, fetch_job_logs | |
| import os | |
| import re | |
| import time | |
| from datetime import datetime | |
| import globals | |
| from utils.io import save_results, load_models_providers | |
| from typing import Optional | |
| def extract_score_from_job(job_id: str) -> Optional[float]: | |
| """Extract average score from completed job logs. | |
| Parses the results table and calculates the average of the main metric | |
| for each task (the metric on the same line as the task name). | |
| """ | |
| try: | |
| # Inspect the job to get details and logs | |
| logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE) | |
| scores = [] | |
| for line in logs: | |
| # Find the results table | |
| # Look for lines that match the pattern: |task_name|version|metric|value|...| | |
| # We want to extract the score (value) from lines where the task name is not empty | |
| if '|' in line: | |
| parts = [p.strip() for p in line.split('|')] | |
| # Skip header and separator lines | |
| # Table format: | Task | Version | Metric | Value | | Stderr | | |
| if len(parts) == 8: | |
| _, task, _, metric, value, _, _, _ = parts | |
| # Is the task name correct | |
| if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]: | |
| # Try to extract numeric value | |
| # Remove any extra characters and convert to float | |
| score = float(value) | |
| scores.append(score) | |
| print(f"Extracted score {score} for task '{task}' metric '{metric}'") | |
| # Calculate average of all task scores | |
| if scores: | |
| average_score = sum(scores) / len(scores) | |
| print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks") | |
| return average_score | |
| else: | |
| print("No scores found in job logs") | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting score for job {job_id}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def run_single_job(model: str, provider: str, tasks: str = globals.TASKS) -> Optional[str]: | |
| """Run a single job for a model-provider combination.""" | |
| if not model or not provider: | |
| print("Missing model or provider") | |
| return -1 | |
| # Verify the model-provider combination exists in the config | |
| models_providers = load_models_providers(globals.LOCAL_CONFIG_FILE) | |
| if (model, provider) not in models_providers: | |
| print( f"Error: {model} with {provider} not found in {globals.LOCAL_CONFIG_FILE}") | |
| return -1 | |
| # Check if job is already running | |
| key = globals.get_model_provider_key(model, provider) | |
| if key in globals.job_results: | |
| current_status = globals.job_results[key].get("status") | |
| if current_status == "RUNNING": | |
| print( f"Job for {model} on {provider} is already running. Please wait for it to complete.") | |
| return -1 | |
| print(f"Starting job for model={model}, provider={provider}") | |
| job = run_job( | |
| image="hf.co/spaces/OpenEvals/EvalsOnTheHub", | |
| command=[ | |
| "lighteval", "endpoint", "inference-providers", | |
| f"model_name={model},provider={provider}", | |
| tasks, | |
| "--push-to-hub", "--save-details", | |
| "--results-org", "IPTesting", | |
| "--max-samples", "10" | |
| ], | |
| namespace=globals.NAMESPACE, | |
| secrets={"HF_TOKEN": os.getenv("HF_TOKEN")}, | |
| token=os.getenv("HF_TOKEN") | |
| ) | |
| job_id = job.id | |
| key = globals.get_model_provider_key(model, provider) | |
| with globals.results_lock: | |
| # Move current score to previous score if it exists (relaunching) | |
| previous_score = None | |
| if key in globals.job_results and globals.job_results[key].get("current_score", None) is not None: | |
| previous_score = globals.job_results[key]["current_score"] | |
| globals.job_results[key] = { | |
| "model": model, | |
| "provider": provider, | |
| "last_run": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "status": "RUNNING", | |
| "current_score": None, | |
| "previous_score": previous_score, | |
| "job_id": job_id | |
| } | |
| save_results() | |
| print(f"Job launched: ID={job_id}, model={model}, provider={provider}") | |
| return job_id | |
| # Todo: factorize both following functions | |
| def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE): | |
| """Launch jobs for all models and providers.""" | |
| models_providers = load_models_providers(config_file) | |
| if not models_providers: | |
| print("No valid model-provider combinations found in config file") | |
| return "No valid model-provider combinations found" | |
| print(f"Found {len(models_providers)} model-provider combinations") | |
| launched_count = 0 | |
| for model, provider in models_providers: | |
| job_id = run_single_job(model, provider, tasks) | |
| if job_id != -1: | |
| launched_count += 1 | |
| # Small delay between launches to avoid rate limiting | |
| time.sleep(2) | |
| print(f"Launched {launched_count}/{len(models_providers)} jobs successfully") | |
| return f"Launched {launched_count} jobs" | |
| def relaunch_failed_jobs(): | |
| """Relaunch only failed model-provider combinations from job results.""" | |
| if not globals.job_results: | |
| return "No existing jobs to relaunch" | |
| failed_jobs = [(key, info) for key, info in globals.job_results.items() | |
| if info.get("status") in ["ERROR", "FAILED"]] | |
| if not failed_jobs: | |
| return "No failed jobs to relaunch" | |
| relaunched_count = 0 | |
| for key, info in failed_jobs: | |
| model = info["model"] | |
| provider = info["provider"] | |
| job_id = run_single_job(model, provider, globals.TASKS) | |
| if job_id != -1: | |
| relaunched_count += 1 | |
| time.sleep(2) # Small delay between launches to avoid rate limiting | |
| return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs" | |
| def update_job_statuses() -> None: | |
| """Check and update the status of active jobs.""" | |
| try: | |
| keys = list(globals.job_results.keys()) | |
| for key in keys: | |
| try: | |
| job_id = globals.job_results[key]["job_id"] | |
| job_info = inspect_job(job_id=job_id, namespace=globals.NAMESPACE) | |
| new_status = job_info.status.stage | |
| with globals.results_lock: | |
| old_status = globals.job_results[key]["status"] | |
| if old_status != new_status: | |
| globals.job_results[key]["status"] = new_status | |
| print(f"Job {job_id} status changed: {old_status} -> {new_status}") | |
| # If job completed, try to extract score | |
| if new_status == "COMPLETED": | |
| score = extract_score_from_job(job_id) | |
| if score is not None: | |
| globals.job_results[key]["current_score"] = score | |
| if new_status == "COMPLETED" and globals.job_results[key]["current_score"] is None: | |
| score = extract_score_from_job(job_id) | |
| if score is not None: | |
| globals.job_results[key]["current_score"] = score | |
| except Exception as e: | |
| print(f"Error checking job: {str(e)}") | |
| save_results() | |
| except Exception as e: | |
| print(f"Error in update_job_statuses: {str(e)}") | |