import io import json import os import shlex import subprocess import traceback import threading import uuid import glob from typing import Dict, Any, List, Optional import requests import pandas as pd from fastapi import Request from huggingface_hub import HfApi, hf_hub_download, snapshot_download from loguru import logger from cachetools import TTLCache, cached from competitions.enums import SubmissionStatus from competitions.params import EvalParams from . import HF_URL USER_TOKEN = os.environ.get("USER_TOKEN") def token_information(token): if token.startswith("hf_oauth"): _api_url = HF_URL + "/oauth/userinfo" else: _api_url = HF_URL + "/api/whoami-v2" headers = {} cookies = {} if token.startswith("hf_"): headers["Authorization"] = f"Bearer {token}" else: cookies = {"token": token} try: response = requests.get( _api_url, headers=headers, cookies=cookies, timeout=10, # Increased timeout from 3 to 10 seconds ) except (requests.Timeout, ConnectionError) as err: logger.error(f"Failed to request whoami-v2 - {repr(err)}") raise Exception("Hugging Face Hub is unreachable, please try again later.") if response.status_code != 200: logger.error(f"Failed to request whoami-v2 - {response.status_code}") raise Exception("Invalid token.") resp = response.json() user_info = {} if token.startswith("hf_oauth"): user_info["id"] = resp["sub"] user_info["name"] = resp["preferred_username"] user_info["orgs"] = [resp["orgs"][k]["preferred_username"] for k in range(len(resp["orgs"]))] else: user_info["id"] = resp["id"] user_info["name"] = resp["name"] user_info["orgs"] = [resp["orgs"][k]["name"] for k in range(len(resp["orgs"]))] return user_info def user_authentication(request: Request): auth_header = request.headers.get("Authorization") bearer_token = None if auth_header and auth_header.startswith("Bearer "): bearer_token = auth_header.split(" ")[1] if bearer_token: try: _ = token_information(token=bearer_token) return bearer_token except Exception as e: logger.error(f"Failed to verify token: {e}") return None if USER_TOKEN is not None: try: _ = token_information(token=USER_TOKEN) return USER_TOKEN except Exception as e: logger.error(f"Failed to verify token: {e}") return None if "oauth_info" in request.session: try: _ = token_information(token=request.session["oauth_info"]["access_token"]) return request.session["oauth_info"]["access_token"] except Exception as e: request.session.pop("oauth_info", None) logger.error(f"Failed to verify token: {e}") return None return None def user_authentication_dep(token, return_raw=False): if token.startswith("hf_oauth"): _api_url = HF_URL + "/oauth/userinfo" else: _api_url = HF_URL + "/api/whoami-v2" headers = {} cookies = {} if token.startswith("hf_"): headers["Authorization"] = f"Bearer {token}" else: cookies = {"token": token} try: response = requests.get( _api_url, headers=headers, cookies=cookies, timeout=10, # Increased timeout from 3 to 10 seconds ) except (requests.Timeout, ConnectionError) as err: logger.error(f"Failed to request whoami-v2 - {repr(err)}") raise Exception("Hugging Face Hub is unreachable, please try again later.") resp = response.json() if return_raw: return resp user_info = {} if "error" in resp: return resp if token.startswith("hf_oauth"): user_info["id"] = resp["sub"] user_info["name"] = resp["preferred_username"] user_info["orgs"] = [resp["orgs"][k]["preferred_username"] for k in range(len(resp["orgs"]))] else: user_info["id"] = resp["id"] user_info["name"] = resp["name"] user_info["orgs"] = [resp["orgs"][k]["name"] for k in range(len(resp["orgs"]))] return user_info def make_clickable_user(user_id): link = "https://huggingface.co/" + user_id return f'{user_id}' def run_evaluation(params, local=False, wait=False): params = json.loads(params) if isinstance(params, str): params = json.loads(params) params = EvalParams(**params) if not local: params.output_path = "/tmp/model" params.save(output_dir=params.output_path) cmd = [ "python", "-m", "competitions.evaluate", "--config", os.path.join(params.output_path, "params.json"), ] cmd = [str(c) for c in cmd] logger.info(cmd) env = os.environ.copy() cmd = shlex.split(" ".join(cmd)) process = subprocess.Popen(cmd, env=env) if wait: process.wait() return process.pid def pause_space(params): if "SPACE_ID" in os.environ: if os.environ["SPACE_ID"].split("/")[-1].startswith("comp-"): logger.info("Pausing space...") api = HfApi(token=params.token) api.pause_space(repo_id=os.environ["SPACE_ID"]) def delete_space(params): if "SPACE_ID" in os.environ: if os.environ["SPACE_ID"].split("/")[-1].startswith("comp-"): logger.info("Deleting space...") api = HfApi(token=params.token) api.delete_repo(repo_id=os.environ["SPACE_ID"], repo_type="space") def download_submission_info(params): user_fname = hf_hub_download( repo_id=params.competition_id, filename=f"submission_info/{params.team_id}.json", token=params.token, repo_type="dataset", ) with open(user_fname, "r", encoding="utf-8") as f: user_submission_info = json.load(f) return user_submission_info def upload_submission_info(params, user_submission_info): user_submission_info_json = json.dumps(user_submission_info, indent=4) user_submission_info_json_bytes = user_submission_info_json.encode("utf-8") user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes) api = HfApi(token=params.token) api.upload_file( path_or_fileobj=user_submission_info_json_buffer, path_in_repo=f"submission_info/{params.team_id}.json", repo_id=params.competition_id, repo_type="dataset", ) def update_submission_status(params, status): user_submission_info = download_submission_info(params) for submission in user_submission_info["submissions"]: if submission["submission_id"] == params.submission_id: submission["status"] = status break upload_submission_info(params, user_submission_info) def update_submission_score(params, public_score, private_score): user_submission_info = download_submission_info(params) for submission in user_submission_info["submissions"]: if submission["submission_id"] == params.submission_id: submission["public_score"] = public_score submission["private_score"] = private_score submission["status"] = "done" break upload_submission_info(params, user_submission_info) def monitor(func): def wrapper(*args, **kwargs): params = kwargs.get("params", None) if params is None and len(args) > 0: params = args[0] try: return func(*args, **kwargs) except Exception as e: error_message = f"""{func.__name__} has failed due to an exception: {traceback.format_exc()}""" logger.error(error_message) logger.error(str(e)) update_submission_status(params, SubmissionStatus.FAILED.value) pause_space(params) return wrapper def uninstall_requirements(requirements_fname): if os.path.exists(requirements_fname): # read the requirements.txt uninstall_list = [] with open(requirements_fname, "r", encoding="utf-8") as f: for line in f: if line.startswith("-"): uninstall_list.append(line[1:]) # create an uninstall.txt with open("uninstall.txt", "w", encoding="utf-8") as f: for line in uninstall_list: f.write(line) pipe = subprocess.Popen( [ "pip", "uninstall", "-r", "uninstall.txt", "-y", ], ) pipe.wait() logger.info("Requirements uninstalled.") return def install_requirements(requirements_fname): # check if params.project_name has a requirements.txt if os.path.exists(requirements_fname): # install the requirements using subprocess, wait for it to finish install_list = [] with open(requirements_fname, "r", encoding="utf-8") as f: for line in f: # if line startswith - then skip but dont skip if line startswith -- if line.startswith("-"): if not line.startswith("--"): continue install_list.append(line) with open("install.txt", "w", encoding="utf-8") as f: for line in install_list: f.write(line) pipe = subprocess.Popen( [ "pip", "install", "-r", "install.txt", ], ) pipe.wait() logger.info("Requirements installed.") return logger.info("No requirements.txt found. Skipping requirements installation.") return def is_user_admin(user_token, competition_organization): user_info = token_information(token=user_token) user_orgs = user_info.get("orgs", []) for org in user_orgs: if org == competition_organization: return True return False def get_team_name(user_token, competition_id, hf_token): user_info = token_information(token=user_token) user_id = user_info["id"] user_team = hf_hub_download( repo_id=competition_id, filename="user_team.json", token=hf_token, repo_type="dataset", ) with open(user_team, "r", encoding="utf-8") as f: user_team = json.load(f) if user_id not in user_team: return None team_id = user_team[user_id] team_metadata = hf_hub_download( repo_id=competition_id, filename="teams.json", token=hf_token, repo_type="dataset", ) with open(team_metadata, "r", encoding="utf-8") as f: team_metadata = json.load(f) team_name = team_metadata[team_id]["name"] return team_name def update_team_name(user_token, new_team_name, competition_id, hf_token): user_info = token_information(token=user_token) user_id = user_info["id"] user_team = hf_hub_download( repo_id=competition_id, filename="user_team.json", token=hf_token, repo_type="dataset", ) with open(user_team, "r", encoding="utf-8") as f: user_team = json.load(f) if user_id not in user_team: raise Exception("User is not part of a team") team_id = user_team[user_id] team_metadata = hf_hub_download( repo_id=competition_id, filename="teams.json", token=hf_token, repo_type="dataset", ) with open(team_metadata, "r", encoding="utf-8") as f: team_metadata = json.load(f) team_metadata[team_id]["name"] = new_team_name team_metadata_json = json.dumps(team_metadata, indent=4) team_metadata_json_bytes = team_metadata_json.encode("utf-8") team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes) api = HfApi(token=hf_token) api.upload_file( path_or_fileobj=team_metadata_json_buffer, path_in_repo="teams.json", repo_id=competition_id, repo_type="dataset", ) return new_team_name class TeamAlreadyExistsError(Exception): """Custom exception for when a team already exists.""" pass class TeamFileApi: """ Team File Management API Class This class manages all team-related operations: - Create new teams - Query team information - Update team names - Manage team whitelist """ def __init__(self, hf_token: str, competition_id: str): self.hf_token = hf_token self.competition_id = competition_id self._lock = threading.Lock() # Thread lock to ensure thread safety for concurrent operations def _get_team_info(self, user_id: str) -> Optional[Dict[str, Any]]: """ Get team information by user ID """ user_team = hf_hub_download( repo_id=self.competition_id, filename="user_team.json", token=self.hf_token, repo_type="dataset", ) with open(user_team, "r", encoding="utf-8") as f: user_team = json.load(f) if user_id not in user_team: return None team_id = user_team[user_id] team_metadata = hf_hub_download( repo_id=self.competition_id, filename="teams.json", token=self.hf_token, repo_type="dataset", ) with open(team_metadata, "r", encoding="utf-8") as f: team_metadata = json.load(f) return team_metadata[team_id] def _create_team(self, user_id: str, team_name: str, other_data: Dict[str, Any]) -> str: """ Create new team (internal method) """ with self._lock: # Use lock to ensure thread safety user_team = hf_hub_download( repo_id=self.competition_id, filename="user_team.json", token=self.hf_token, repo_type="dataset", ) with open(user_team, "r", encoding="utf-8") as f: user_team = json.load(f) team_metadata = hf_hub_download( repo_id=self.competition_id, filename="teams.json", token=self.hf_token, repo_type="dataset", ) with open(team_metadata, "r", encoding="utf-8") as f: team_metadata = json.load(f) # Create new team ID team_id = str(uuid.uuid4()) user_team[user_id] = team_id # Create team metadata team_metadata[team_id] = { "id": team_id, "name": team_name, "members": [user_id], "leader": user_id, "other_data": other_data, } # Upload user-team mapping file user_team_json = json.dumps(user_team, indent=4) user_team_json_bytes = user_team_json.encode("utf-8") user_team_json_buffer = io.BytesIO(user_team_json_bytes) # Upload team metadata file team_metadata_json = json.dumps(team_metadata, indent=4) team_metadata_json_bytes = team_metadata_json.encode("utf-8") team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes) api = HfApi(token=self.hf_token) api.upload_file( path_or_fileobj=user_team_json_buffer, path_in_repo="user_team.json", repo_id=self.competition_id, repo_type="dataset", ) api.upload_file( path_or_fileobj=team_metadata_json_buffer, path_in_repo="teams.json", repo_id=self.competition_id, repo_type="dataset", ) team_submission_info = {} team_submission_info["id"] = team_id team_submission_info["submissions"] = [] team_submission_info_json = json.dumps(team_submission_info, indent=4) team_submission_info_json_bytes = team_submission_info_json.encode("utf-8") team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes) api.upload_file( path_or_fileobj=team_submission_info_json_buffer, path_in_repo=f"submission_info/{team_id}.json", repo_id=self.competition_id, repo_type="dataset", ) return team_id def create_team(self, user_token: str, team_name: str, other_data: Dict[str, Any]) -> str: """ Create team using user token (public interface) """ user_info = token_information(token=user_token) return self._create_team(user_info["id"], team_name, other_data) def get_team_info(self, user_token: str) -> Optional[Dict[str, Any]]: """ Get user's team information """ user_info = token_information(token=user_token) return self._get_team_info(user_info["id"]) def update_team_name(self, user_token, new_team_name): """ Update team name """ user_info = token_information(token=user_token) user_id = user_info["id"] team_info = self._get_team_info(user_id) with self._lock: team_metadata = hf_hub_download( repo_id=self.competition_id, filename="teams.json", token=self.hf_token, repo_type="dataset", ) with open(team_metadata, "r", encoding="utf-8") as f: team_metadata = json.load(f) team_metadata[team_info["id"]]["name"] = new_team_name team_metadata_json = json.dumps(team_metadata, indent=4) team_metadata_json_bytes = team_metadata_json.encode("utf-8") team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes) api = HfApi(token=self.hf_token) api.upload_file( path_or_fileobj=team_metadata_json_buffer, path_in_repo="teams.json", repo_id=self.competition_id, repo_type="dataset", ) return new_team_name @cached(cache=TTLCache(maxsize=1, ttl=60)) # Cache for 1 minute def get_team_white_list(self) -> List[str]: """ Get team whitelist (cached for 10 minutes) """ try: file = hf_hub_download( repo_id=self.competition_id, filename="team_id_whitelist.json", token=self.hf_token, repo_type="dataset", ) with open(file, "r", encoding="utf-8") as f: team_white_list = json.load(f) return team_white_list except: # Return empty list if whitelist file doesn't exist return [] # Create global team file API instance team_file_api = TeamFileApi( os.environ.get("HF_TOKEN", None), os.environ.get("COMPETITION_ID"), ) class LeaderboardApi: """ Simplified leaderboard API class, similar to reference code design Automatically displays the best score for each team without manual selection """ def __init__(self, hf_token: str, competition_id: str): self.hf_token = hf_token self.competition_id = competition_id self.api = HfApi(token=hf_token) @cached(cache=TTLCache(maxsize=1, ttl=300)) # Cache for 5 minutes def get_leaderboard(self) -> pd.DataFrame: """ Get leaderboard data Returns: pd.DataFrame: Leaderboard DataFrame """ all_scores = self._get_all_scores() if not all_scores: return pd.DataFrame() df = pd.DataFrame(all_scores) # Keep only the best score for each team (data already processed in _get_all_scores) # Sort by extended_pdm_score_combined in descending order if "extended_pdm_score_combined" in df.columns: df = df.sort_values(by=["extended_pdm_score_combined"], ascending=[False]) # Add ranking df = df.reset_index(drop=True) df['rank'] = range(1, len(df) + 1) # Keep 4 decimal places score_columns = [col for col in df.columns if col not in ['team_id', 'team_name', 'submission_datetime', 'rank']] for col in score_columns: if df[col].dtype in ['float64', 'float32', 'int64', 'int32']: df[col] = df[col].round(4) # Reorder columns: rank, team_name, all score columns, submission_datetime available_columns = ['rank', 'team_name'] + [col for col in score_columns if col in df.columns] + ['submission_datetime'] df = df[available_columns] # Format submission_datetime if 'submission_datetime' in df.columns: df['submission_datetime'] = pd.to_datetime(df['submission_datetime']).dt.strftime('%Y-%m-%d %H:%M:%S') return df def _get_all_scores(self) -> List[Dict[str, Any]]: """Get score data for all teams""" try: # Download team metadata team_metadata_path = hf_hub_download( repo_id=self.competition_id, filename="teams.json", token=self.hf_token, repo_type="dataset", ) with open(team_metadata_path, "r", encoding="utf-8") as f: team_metadata = json.load(f) # Download all submission information submissions_folder = snapshot_download( repo_id=self.competition_id, allow_patterns="submission_info/*.json", token=self.hf_token, repo_type="dataset", ) submission_jsons = glob.glob(os.path.join(submissions_folder, "submission_info/*.json")) all_scores = [] for json_path in submission_jsons: with open(json_path, "r", encoding="utf-8") as f: submission_data = json.load(f) team_id = submission_data["id"] # Check if team exists in metadata if team_id not in team_metadata: logger.warning(f"Team {team_id} not found in team metadata, skipping...") continue # Collect all successful submissions for this team team_submissions = [] for sub in submission_data["submissions"]: if sub["status"] != SubmissionStatus.SUCCESS.value: continue # Check if score data exists if "public_score" not in sub: continue # Create a record containing all scores score_record = { "team_id": team_id, "team_name": team_metadata[team_id]["name"], "submission_datetime": sub["datetime"] } # Expand all public_score key-value pairs for score_key, score_value in sub["public_score"].items(): score_record[score_key] = score_value team_submissions.append(score_record) # Select best submission for this team (based on extended_pdm_score_combined) if team_submissions: # Sort by extended_pdm_score_combined in descending order, take the best if "extended_pdm_score_combined" in team_submissions[0]: team_submissions.sort(key=lambda x: x["extended_pdm_score_combined"], reverse=True) best_submission = team_submissions[0] all_scores.append(best_submission) return all_scores except Exception as e: logger.error(f"Error getting leaderboard scores: {e}") return [] # Create global leaderboard_api instance leaderboard_api = LeaderboardApi( hf_token=os.environ.get("HF_TOKEN", None), competition_id=os.environ.get("COMPETITION_ID") )