Spaces:
Sleeping
Sleeping
| from huggingface_hub import run_job, inspect_job, fetch_job_logs | |
| import os | |
| import re | |
| import time | |
| from datetime import datetime | |
| import globals | |
| from utils.io import save_results, load_models_providers | |
| from typing import Optional | |
| def extract_score_from_job(job_id: str) -> Optional[float]: | |
| """Extract average score from completed job logs. | |
| Parses the results table and calculates the average of the main metric | |
| for each task (the metric on the same line as the task name). | |
| """ | |
| try: | |
| # Inspect the job to get details and logs | |
| logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE) | |
| scores = [] | |
| for line in logs: | |
| # Find the results table | |
| # Look for lines that match the pattern: |task_name|version|metric|value|...| | |
| # We want to extract the score (value) from lines where the task name is not empty | |
| if '|' in line: | |
| parts = [p.strip() for p in line.split('|')] | |
| # Skip header and separator lines | |
| # Table format: | Task | Version | Metric | Value | | Stderr | | |
| if len(parts) == 8: | |
| _, task, _, metric, value, _, _, _ = parts | |
| # Is the task name correct | |
| if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]: | |
| # Try to extract numeric value | |
| # Remove any extra characters and convert to float | |
| score = float(value) | |
| scores.append(score) | |
| print(f"Extracted score {score} for task '{task}' metric '{metric}'") | |
| # Calculate average of all task scores | |
| if scores: | |
| average_score = sum(scores) / len(scores) | |
| print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks") | |
| return average_score | |
| else: | |
| print("No scores found in job logs") | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting score for job {job_id}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def run_single_job(model: str, provider: str, tasks: str = globals.TASKS, run_number: int = 1) -> Optional[str]: | |
| """Run a single job for a model-provider combination. | |
| Args: | |
| model: Model ID | |
| provider: Provider name | |
| tasks: Tasks to run | |
| run_number: Which run this is (1-4 for multiple runs) | |
| """ | |
| if not model or not provider: | |
| print("Missing model or provider") | |
| return -1 | |
| # Check if this specific run number is already running for this model-provider | |
| key = globals.get_model_provider_key(model, provider) | |
| if key in globals.job_results: | |
| runs = globals.job_results[key].get("runs", []) | |
| # Check if this specific run number is already running | |
| for run in runs: | |
| if run.get("run_number") == run_number and run.get("status") == "RUNNING": | |
| print(f"Run {run_number} for {model} on {provider} is already running. Please wait for it to complete.") | |
| return -1 | |
| print(f"Starting job for model={model}, provider={provider}, run {run_number}/{globals.NUM_RUNS_PER_JOB}") | |
| job = run_job( | |
| image="hf.co/spaces/OpenEvals/EvalsOnTheHub", | |
| command=[ | |
| "lighteval", "endpoint", "inference-providers", | |
| f"model_name={model},provider={provider}", | |
| tasks, | |
| "--push-to-hub", "--save-details", | |
| "--results-org", "IPTesting", | |
| ], | |
| namespace=globals.NAMESPACE, | |
| secrets={"HF_TOKEN": os.getenv("HF_TOKEN")}, | |
| token=os.getenv("HF_TOKEN") | |
| ) | |
| job_id = job.id | |
| start_time = datetime.now() | |
| with globals.results_lock: | |
| # Initialize or update the job result | |
| if key not in globals.job_results: | |
| # First run - initialize the structure | |
| previous_score = None | |
| globals.job_results[key] = { | |
| "model": model, | |
| "provider": provider, | |
| "last_run": start_time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "status": "RUNNING", | |
| "current_score": None, | |
| "previous_score": None, | |
| "job_id": job_id, | |
| "start_time": start_time.isoformat(), | |
| "duration": None, | |
| "completed_at": None, | |
| "runs": [] | |
| } | |
| else: | |
| # Subsequent run or relaunch | |
| previous_score = globals.job_results[key].get("current_score") | |
| globals.job_results[key]["status"] = "RUNNING" | |
| globals.job_results[key]["last_run"] = start_time.strftime("%Y-%m-%d %H:%M:%S") | |
| globals.job_results[key]["start_time"] = start_time.isoformat() | |
| globals.job_results[key]["previous_score"] = previous_score | |
| # Add this run to the runs list | |
| globals.job_results[key]["runs"].append({ | |
| "run_number": run_number, | |
| "job_id": job_id, | |
| "status": "RUNNING", | |
| "score": None, | |
| "start_time": start_time.isoformat(), | |
| "duration": None, | |
| "completed_at": None | |
| }) | |
| # Don't save immediately - let the periodic save handle it | |
| print(f"Job launched: ID={job_id}, model={model}, provider={provider}, run {run_number}") | |
| return job_id | |
| def run_multiple_jobs(model: str, provider: str, tasks: str = globals.TASKS, num_runs: int = globals.NUM_RUNS_PER_JOB) -> list: | |
| """Run multiple jobs for a model-provider combination to reduce variance. | |
| Returns: | |
| List of job IDs launched | |
| """ | |
| job_ids = [] | |
| for run_number in range(1, num_runs + 1): | |
| job_id = run_single_job(model, provider, tasks, run_number=run_number) | |
| if job_id != -1: | |
| job_ids.append(job_id) | |
| return job_ids | |
| # Todo: factorize both following functions | |
| def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE): | |
| """Launch jobs for all models and providers with multiple runs per combination.""" | |
| models_providers = load_models_providers(config_file) | |
| if not models_providers: | |
| print("No valid model-provider combinations found in config file") | |
| return "No valid model-provider combinations found" | |
| print(f"Found {len(models_providers)} model-provider combinations") | |
| print(f"Will launch {globals.NUM_RUNS_PER_JOB} runs per combination") | |
| launched_count = 0 | |
| for model, provider in models_providers: | |
| job_ids = run_multiple_jobs(model, provider, tasks) | |
| if job_ids: | |
| launched_count += len(job_ids) | |
| # Save all results once after launching all jobs | |
| save_results() | |
| total_expected = len(models_providers) * globals.NUM_RUNS_PER_JOB | |
| print(f"Launched {launched_count}/{total_expected} jobs successfully") | |
| return f"Launched {launched_count}/{total_expected} jobs ({len(models_providers)} model-provider combinations × {globals.NUM_RUNS_PER_JOB} runs each)" | |
| def relaunch_failed_jobs(): | |
| """Relaunch only failed model-provider combinations from job results.""" | |
| if not globals.job_results: | |
| return "No existing jobs to relaunch" | |
| failed_jobs = [(key, info) for key, info in globals.job_results.items() | |
| if info.get("status") in ["ERROR", "FAILED"]] | |
| if not failed_jobs: | |
| return "No failed jobs to relaunch" | |
| relaunched_count = 0 | |
| for key, info in failed_jobs: | |
| model = info["model"] | |
| provider = info["provider"] | |
| job_id = run_single_job(model, provider, globals.TASKS) | |
| if job_id != -1: | |
| relaunched_count += 1 | |
| # Save all results once after relaunching all failed jobs | |
| save_results() | |
| return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs" | |
| def update_job_statuses() -> None: | |
| """Check and update the status of active jobs and aggregate scores from multiple runs.""" | |
| try: | |
| keys = list(globals.job_results.keys()) | |
| for key in keys: | |
| try: | |
| with globals.results_lock: | |
| runs = globals.job_results[key].get("runs", []) | |
| if not runs: | |
| # Legacy format - no runs list | |
| continue | |
| # Check status of each run | |
| all_completed = True | |
| all_failed = True | |
| any_running = False | |
| for run in runs: | |
| if run["status"] == "RUNNING": | |
| # Check if this run's job is still running | |
| try: | |
| job_info = inspect_job(job_id=run["job_id"], namespace=globals.NAMESPACE) | |
| new_status = job_info.status.stage | |
| if run["status"] != new_status: | |
| run["status"] = new_status | |
| print(f"Run {run['run_number']} job {run['job_id']} status changed: {run['status']} -> {new_status}") | |
| if new_status == "COMPLETED": | |
| completed_time = datetime.now() | |
| run["completed_at"] = completed_time.strftime("%Y-%m-%d %H:%M:%S") | |
| # Calculate duration | |
| if run.get("start_time"): | |
| start_time = datetime.fromisoformat(run["start_time"]) | |
| run["duration"] = (completed_time - start_time).total_seconds() | |
| # Extract score | |
| score = extract_score_from_job(run["job_id"]) | |
| if score is not None: | |
| run["score"] = score | |
| print(f"Run {run['run_number']}: extracted score {score:.4f}") | |
| except Exception as e: | |
| print(f"Error checking run {run['run_number']}: {e}") | |
| # Update aggregate status flags | |
| if run["status"] == "RUNNING": | |
| any_running = True | |
| all_completed = False | |
| all_failed = False | |
| elif run["status"] == "COMPLETED": | |
| all_failed = False | |
| elif run["status"] in ["ERROR", "FAILED"]: | |
| all_completed = False | |
| # Update overall status | |
| if any_running: | |
| globals.job_results[key]["status"] = "RUNNING" | |
| elif all_completed: | |
| globals.job_results[key]["status"] = "COMPLETED" | |
| # Calculate aggregate statistics from completed runs | |
| completed_scores = [run["score"] for run in runs if run["status"] == "COMPLETED" and run["score"] is not None] | |
| completed_durations = [run["duration"] for run in runs if run["status"] == "COMPLETED" and run.get("duration") is not None] | |
| if completed_scores: | |
| import statistics | |
| mean_score = statistics.mean(completed_scores) | |
| variance = statistics.variance(completed_scores) if len(completed_scores) > 1 else 0.0 | |
| globals.job_results[key]["current_score"] = mean_score | |
| globals.job_results[key]["score_variance"] = variance | |
| print(f"Aggregated {len(completed_scores)} runs: mean={mean_score:.4f}, variance={variance:.6f}") | |
| # Calculate average duration | |
| if completed_durations: | |
| mean_duration = statistics.mean(completed_durations) | |
| globals.job_results[key]["duration"] = mean_duration | |
| print(f"Average duration: {mean_duration:.2f} seconds") | |
| # Update completion time to latest run | |
| latest_completion = max([run["completed_at"] for run in runs if run.get("completed_at")], default=None) | |
| if latest_completion: | |
| globals.job_results[key]["completed_at"] = latest_completion | |
| elif all_failed: | |
| globals.job_results[key]["status"] = "ERROR" | |
| except Exception as e: | |
| print(f"Error checking job: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| save_results() | |
| except Exception as e: | |
| print(f"Error in update_job_statuses: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |