File size: 7,671 Bytes
8dafde0
7f5506e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
096bf86
7f5506e
8dafde0
7f5506e
8dafde0
7f5506e
 
 
8dafde0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f5506e
 
 
 
 
 
 
 
 
 
32de2c3
7f5506e
 
 
 
 
 
 
 
 
 
 
 
 
 
32de2c3
 
 
 
 
7f5506e
 
 
 
 
 
 
 
 
 
 
 
 
8580b64
7f5506e
 
 
 
 
 
 
 
 
 
6e99f98
7f5506e
 
 
 
 
 
510a6cc
7f5506e
 
 
 
 
 
 
 
 
80d548a
7f5506e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80d548a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f5506e
 
 
 
32de2c3
7f5506e
 
 
32de2c3
7f5506e
a791654
7f5506e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8dafde0
 
 
 
 
 
7f5506e
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from huggingface_hub import run_job, inspect_job, fetch_job_logs
import os
import re
import time
from datetime import datetime
import globals
from utils.io import save_results, load_models_providers
from typing import Optional


def extract_score_from_job(job_id: str) -> Optional[float]:
    """Extract average score from completed job logs.

    Parses the results table and calculates the average of the main metric
    for each task (the metric on the same line as the task name).
    """
    try:
        # Inspect the job to get details and logs
        logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE)

        scores = []

        for line in logs:
            # Find the results table
            # Look for lines that match the pattern: |task_name|version|metric|value|...|
            # We want to extract the score (value) from lines where the task name is not empty
            if '|' in line:
                parts = [p.strip() for p in line.split('|')]

                # Skip header and separator lines
                # Table format: | Task | Version | Metric | Value | | Stderr |
                if len(parts) == 8:
                    _, task, _, metric, value, _, _, _  = parts

                    # Is the task name correct
                    if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]:
                        # Try to extract numeric value
                        # Remove any extra characters and convert to float
                        score = float(value)
                        scores.append(score)
                        print(f"Extracted score {score} for task '{task}' metric '{metric}'")

        # Calculate average of all task scores
        if scores:
            average_score = sum(scores) / len(scores)
            print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks")
            return average_score
        else:
            print("No scores found in job logs")

        return None

    except Exception as e:
        print(f"Error extracting score for job {job_id}: {e}")
        import traceback
        traceback.print_exc()
        return None


def run_single_job(model: str, provider: str, tasks: str = globals.TASKS) -> Optional[str]:
    """Run a single job for a model-provider combination."""

    if not model or not provider:
        print("Missing model or provider")
        return -1

    # Verify the model-provider combination exists in the config
    models_providers = load_models_providers(globals.LOCAL_CONFIG_FILE)
    if (model, provider) not in models_providers:
        print( f"Error: {model} with {provider} not found in {globals.LOCAL_CONFIG_FILE}")
        return -1

    # Check if job is already running
    key = globals.get_model_provider_key(model, provider)
    if key in globals.job_results:
        current_status = globals.job_results[key].get("status")
        if current_status == "RUNNING":
            print( f"Job for {model} on {provider} is already running. Please wait for it to complete.")
            return -1

    print(f"Starting job for model={model}, provider={provider}")

    job = run_job(
        image="hf.co/spaces/OpenEvals/EvalsOnTheHub",
        command=[
            "lighteval", "endpoint", "inference-providers",
            f"model_name={model},provider={provider}",
            tasks,
            "--push-to-hub", "--save-details",
            "--results-org", "IPTesting",
            "--max-samples", "10"
        ],
        namespace=globals.NAMESPACE,
        secrets={"HF_TOKEN": os.getenv("HF_TOKEN")},
        token=os.getenv("HF_TOKEN")
    )

    job_id = job.id
    key = globals.get_model_provider_key(model, provider)

    with globals.results_lock:
        # Move current score to previous score if it exists (relaunching)
        previous_score = None
        if key in globals.job_results and globals.job_results[key].get("current_score", None) is not None:
            previous_score = globals.job_results[key]["current_score"]

        globals.job_results[key] = {
            "model": model,
            "provider": provider,
            "last_run": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "status": "RUNNING",
            "current_score": None,
            "previous_score": previous_score,
            "job_id": job_id
        }

    save_results()
    print(f"Job launched: ID={job_id}, model={model}, provider={provider}")
    return job_id

# Todo: factorize both following functions
def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE):
    """Launch jobs for all models and providers."""
    models_providers = load_models_providers(config_file)

    if not models_providers:
        print("No valid model-provider combinations found in config file")
        return "No valid model-provider combinations found"

    print(f"Found {len(models_providers)} model-provider combinations")

    launched_count = 0
    for model, provider in models_providers:
        job_id = run_single_job(model, provider, tasks)
        if job_id != -1:
            launched_count += 1
        # Small delay between launches to avoid rate limiting
        time.sleep(2)

    print(f"Launched {launched_count}/{len(models_providers)} jobs successfully")
    return f"Launched {launched_count} jobs"

def relaunch_failed_jobs():
    """Relaunch only failed model-provider combinations from job results."""
    if not globals.job_results:
        return "No existing jobs to relaunch"

    failed_jobs = [(key, info) for key, info in globals.job_results.items()
                    if info.get("status") in ["ERROR", "FAILED"]]

    if not failed_jobs:
        return "No failed jobs to relaunch"

    relaunched_count = 0
    for key, info in failed_jobs:
        model = info["model"]
        provider = info["provider"]
        job_id = run_single_job(model, provider, globals.TASKS)
        if job_id != -1:
            relaunched_count += 1
        time.sleep(2)  # Small delay between launches to avoid rate limiting

    return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs"



def update_job_statuses() -> None:
    """Check and update the status of active jobs."""
    try:
        keys = list(globals.job_results.keys())

        for key in keys:
            try:
                job_id = globals.job_results[key]["job_id"]

                job_info = inspect_job(job_id=job_id, namespace=globals.NAMESPACE)
                new_status = job_info.status.stage

                with globals.results_lock:
                    old_status = globals.job_results[key]["status"]

                    if old_status != new_status:
                        globals.job_results[key]["status"] = new_status
                        print(f"Job {job_id} status changed: {old_status} -> {new_status}")

                        # If job completed, try to extract score
                        if new_status == "COMPLETED":
                            score = extract_score_from_job(job_id)
                            if score is not None:
                                globals.job_results[key]["current_score"] = score

                    if new_status == "COMPLETED" and globals.job_results[key]["current_score"] is None:
                        score = extract_score_from_job(job_id)
                        if score is not None:
                            globals.job_results[key]["current_score"] = score


            except Exception as e:
                print(f"Error checking job: {str(e)}")

        save_results()

    except Exception as e:
        print(f"Error in update_job_statuses: {str(e)}")