Spaces:
Running
Running
import io | |
import json | |
import os | |
import shlex | |
import subprocess | |
import traceback | |
import threading | |
import uuid | |
import glob | |
from typing import Dict, Any, List, Optional | |
import requests | |
import pandas as pd | |
from fastapi import Request | |
from huggingface_hub import HfApi, hf_hub_download, snapshot_download | |
from loguru import logger | |
from cachetools import TTLCache, cached | |
from competitions.enums import SubmissionStatus | |
from competitions.params import EvalParams | |
from . import HF_URL | |
USER_TOKEN = os.environ.get("USER_TOKEN") | |
def token_information(token): | |
if token.startswith("hf_oauth"): | |
_api_url = HF_URL + "/oauth/userinfo" | |
else: | |
_api_url = HF_URL + "/api/whoami-v2" | |
headers = {} | |
cookies = {} | |
if token.startswith("hf_"): | |
headers["Authorization"] = f"Bearer {token}" | |
else: | |
cookies = {"token": token} | |
try: | |
response = requests.get( | |
_api_url, | |
headers=headers, | |
cookies=cookies, | |
timeout=10, # Increased timeout from 3 to 10 seconds | |
) | |
except (requests.Timeout, ConnectionError) as err: | |
logger.error(f"Failed to request whoami-v2 - {repr(err)}") | |
raise Exception("Hugging Face Hub is unreachable, please try again later.") | |
if response.status_code != 200: | |
logger.error(f"Failed to request whoami-v2 - {response.status_code}") | |
raise Exception("Invalid token.") | |
resp = response.json() | |
user_info = {} | |
if token.startswith("hf_oauth"): | |
user_info["id"] = resp["sub"] | |
user_info["name"] = resp["preferred_username"] | |
user_info["orgs"] = [resp["orgs"][k]["preferred_username"] for k in range(len(resp["orgs"]))] | |
else: | |
user_info["id"] = resp["id"] | |
user_info["name"] = resp["name"] | |
user_info["orgs"] = [resp["orgs"][k]["name"] for k in range(len(resp["orgs"]))] | |
return user_info | |
def user_authentication(request: Request): | |
auth_header = request.headers.get("Authorization") | |
bearer_token = None | |
if auth_header and auth_header.startswith("Bearer "): | |
bearer_token = auth_header.split(" ")[1] | |
if bearer_token: | |
try: | |
_ = token_information(token=bearer_token) | |
return bearer_token | |
except Exception as e: | |
logger.error(f"Failed to verify token: {e}") | |
return None | |
if USER_TOKEN is not None: | |
try: | |
_ = token_information(token=USER_TOKEN) | |
return USER_TOKEN | |
except Exception as e: | |
logger.error(f"Failed to verify token: {e}") | |
return None | |
if "oauth_info" in request.session: | |
try: | |
_ = token_information(token=request.session["oauth_info"]["access_token"]) | |
return request.session["oauth_info"]["access_token"] | |
except Exception as e: | |
request.session.pop("oauth_info", None) | |
logger.error(f"Failed to verify token: {e}") | |
return None | |
return None | |
def user_authentication_dep(token, return_raw=False): | |
if token.startswith("hf_oauth"): | |
_api_url = HF_URL + "/oauth/userinfo" | |
else: | |
_api_url = HF_URL + "/api/whoami-v2" | |
headers = {} | |
cookies = {} | |
if token.startswith("hf_"): | |
headers["Authorization"] = f"Bearer {token}" | |
else: | |
cookies = {"token": token} | |
try: | |
response = requests.get( | |
_api_url, | |
headers=headers, | |
cookies=cookies, | |
timeout=10, # Increased timeout from 3 to 10 seconds | |
) | |
except (requests.Timeout, ConnectionError) as err: | |
logger.error(f"Failed to request whoami-v2 - {repr(err)}") | |
raise Exception("Hugging Face Hub is unreachable, please try again later.") | |
resp = response.json() | |
if return_raw: | |
return resp | |
user_info = {} | |
if "error" in resp: | |
return resp | |
if token.startswith("hf_oauth"): | |
user_info["id"] = resp["sub"] | |
user_info["name"] = resp["preferred_username"] | |
user_info["orgs"] = [resp["orgs"][k]["preferred_username"] for k in range(len(resp["orgs"]))] | |
else: | |
user_info["id"] = resp["id"] | |
user_info["name"] = resp["name"] | |
user_info["orgs"] = [resp["orgs"][k]["name"] for k in range(len(resp["orgs"]))] | |
return user_info | |
def make_clickable_user(user_id): | |
link = "https://huggingface.co/" + user_id | |
return f'<a target="_blank" href="{link}">{user_id}</a>' | |
def run_evaluation(params, local=False, wait=False): | |
params = json.loads(params) | |
if isinstance(params, str): | |
params = json.loads(params) | |
params = EvalParams(**params) | |
if not local: | |
params.output_path = "/tmp/model" | |
params.save(output_dir=params.output_path) | |
cmd = [ | |
"python", | |
"-m", | |
"competitions.evaluate", | |
"--config", | |
os.path.join(params.output_path, "params.json"), | |
] | |
cmd = [str(c) for c in cmd] | |
logger.info(cmd) | |
env = os.environ.copy() | |
cmd = shlex.split(" ".join(cmd)) | |
process = subprocess.Popen(cmd, env=env) | |
if wait: | |
process.wait() | |
return process.pid | |
def pause_space(params): | |
if "SPACE_ID" in os.environ: | |
if os.environ["SPACE_ID"].split("/")[-1].startswith("comp-"): | |
logger.info("Pausing space...") | |
api = HfApi(token=params.token) | |
api.pause_space(repo_id=os.environ["SPACE_ID"]) | |
def delete_space(params): | |
if "SPACE_ID" in os.environ: | |
if os.environ["SPACE_ID"].split("/")[-1].startswith("comp-"): | |
logger.info("Deleting space...") | |
api = HfApi(token=params.token) | |
api.delete_repo(repo_id=os.environ["SPACE_ID"], repo_type="space") | |
def download_submission_info(params): | |
user_fname = hf_hub_download( | |
repo_id=params.competition_id, | |
filename=f"submission_info/{params.team_id}.json", | |
token=params.token, | |
repo_type="dataset", | |
) | |
with open(user_fname, "r", encoding="utf-8") as f: | |
user_submission_info = json.load(f) | |
return user_submission_info | |
def upload_submission_info(params, user_submission_info): | |
user_submission_info_json = json.dumps(user_submission_info, indent=4) | |
user_submission_info_json_bytes = user_submission_info_json.encode("utf-8") | |
user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes) | |
api = HfApi(token=params.token) | |
api.upload_file( | |
path_or_fileobj=user_submission_info_json_buffer, | |
path_in_repo=f"submission_info/{params.team_id}.json", | |
repo_id=params.competition_id, | |
repo_type="dataset", | |
) | |
def update_submission_status(params, status): | |
user_submission_info = download_submission_info(params) | |
for submission in user_submission_info["submissions"]: | |
if submission["submission_id"] == params.submission_id: | |
submission["status"] = status | |
break | |
upload_submission_info(params, user_submission_info) | |
def update_submission_score(params, public_score, private_score): | |
user_submission_info = download_submission_info(params) | |
for submission in user_submission_info["submissions"]: | |
if submission["submission_id"] == params.submission_id: | |
submission["public_score"] = public_score | |
submission["private_score"] = private_score | |
submission["status"] = "done" | |
break | |
upload_submission_info(params, user_submission_info) | |
def monitor(func): | |
def wrapper(*args, **kwargs): | |
params = kwargs.get("params", None) | |
if params is None and len(args) > 0: | |
params = args[0] | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
error_message = f"""{func.__name__} has failed due to an exception: {traceback.format_exc()}""" | |
logger.error(error_message) | |
logger.error(str(e)) | |
update_submission_status(params, SubmissionStatus.FAILED.value) | |
pause_space(params) | |
return wrapper | |
def uninstall_requirements(requirements_fname): | |
if os.path.exists(requirements_fname): | |
# read the requirements.txt | |
uninstall_list = [] | |
with open(requirements_fname, "r", encoding="utf-8") as f: | |
for line in f: | |
if line.startswith("-"): | |
uninstall_list.append(line[1:]) | |
# create an uninstall.txt | |
with open("uninstall.txt", "w", encoding="utf-8") as f: | |
for line in uninstall_list: | |
f.write(line) | |
pipe = subprocess.Popen( | |
[ | |
"pip", | |
"uninstall", | |
"-r", | |
"uninstall.txt", | |
"-y", | |
], | |
) | |
pipe.wait() | |
logger.info("Requirements uninstalled.") | |
return | |
def install_requirements(requirements_fname): | |
# check if params.project_name has a requirements.txt | |
if os.path.exists(requirements_fname): | |
# install the requirements using subprocess, wait for it to finish | |
install_list = [] | |
with open(requirements_fname, "r", encoding="utf-8") as f: | |
for line in f: | |
# if line startswith - then skip but dont skip if line startswith -- | |
if line.startswith("-"): | |
if not line.startswith("--"): | |
continue | |
install_list.append(line) | |
with open("install.txt", "w", encoding="utf-8") as f: | |
for line in install_list: | |
f.write(line) | |
pipe = subprocess.Popen( | |
[ | |
"pip", | |
"install", | |
"-r", | |
"install.txt", | |
], | |
) | |
pipe.wait() | |
logger.info("Requirements installed.") | |
return | |
logger.info("No requirements.txt found. Skipping requirements installation.") | |
return | |
def is_user_admin(user_token, competition_organization): | |
user_info = token_information(token=user_token) | |
user_orgs = user_info.get("orgs", []) | |
for org in user_orgs: | |
if org == competition_organization: | |
return True | |
return False | |
def get_team_name(user_token, competition_id, hf_token): | |
user_info = token_information(token=user_token) | |
user_id = user_info["id"] | |
user_team = hf_hub_download( | |
repo_id=competition_id, | |
filename="user_team.json", | |
token=hf_token, | |
repo_type="dataset", | |
) | |
with open(user_team, "r", encoding="utf-8") as f: | |
user_team = json.load(f) | |
if user_id not in user_team: | |
return None | |
team_id = user_team[user_id] | |
team_metadata = hf_hub_download( | |
repo_id=competition_id, | |
filename="teams.json", | |
token=hf_token, | |
repo_type="dataset", | |
) | |
with open(team_metadata, "r", encoding="utf-8") as f: | |
team_metadata = json.load(f) | |
team_name = team_metadata[team_id]["name"] | |
return team_name | |
def update_team_name(user_token, new_team_name, competition_id, hf_token): | |
user_info = token_information(token=user_token) | |
user_id = user_info["id"] | |
user_team = hf_hub_download( | |
repo_id=competition_id, | |
filename="user_team.json", | |
token=hf_token, | |
repo_type="dataset", | |
) | |
with open(user_team, "r", encoding="utf-8") as f: | |
user_team = json.load(f) | |
if user_id not in user_team: | |
raise Exception("User is not part of a team") | |
team_id = user_team[user_id] | |
team_metadata = hf_hub_download( | |
repo_id=competition_id, | |
filename="teams.json", | |
token=hf_token, | |
repo_type="dataset", | |
) | |
with open(team_metadata, "r", encoding="utf-8") as f: | |
team_metadata = json.load(f) | |
team_metadata[team_id]["name"] = new_team_name | |
team_metadata_json = json.dumps(team_metadata, indent=4) | |
team_metadata_json_bytes = team_metadata_json.encode("utf-8") | |
team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes) | |
api = HfApi(token=hf_token) | |
api.upload_file( | |
path_or_fileobj=team_metadata_json_buffer, | |
path_in_repo="teams.json", | |
repo_id=competition_id, | |
repo_type="dataset", | |
) | |
return new_team_name | |
class TeamAlreadyExistsError(Exception): | |
"""Custom exception for when a team already exists.""" | |
pass | |
class TeamFileApi: | |
""" | |
Team File Management API Class | |
This class manages all team-related operations: | |
- Create new teams | |
- Query team information | |
- Update team names | |
- Manage team whitelist | |
""" | |
def __init__(self, hf_token: str, competition_id: str): | |
self.hf_token = hf_token | |
self.competition_id = competition_id | |
self._lock = threading.Lock() # Thread lock to ensure thread safety for concurrent operations | |
def _get_team_info(self, user_id: str) -> Optional[Dict[str, Any]]: | |
""" | |
Get team information by user ID | |
""" | |
user_team = hf_hub_download( | |
repo_id=self.competition_id, | |
filename="user_team.json", | |
token=self.hf_token, | |
repo_type="dataset", | |
) | |
with open(user_team, "r", encoding="utf-8") as f: | |
user_team = json.load(f) | |
if user_id not in user_team: | |
return None | |
team_id = user_team[user_id] | |
team_metadata = hf_hub_download( | |
repo_id=self.competition_id, | |
filename="teams.json", | |
token=self.hf_token, | |
repo_type="dataset", | |
) | |
with open(team_metadata, "r", encoding="utf-8") as f: | |
team_metadata = json.load(f) | |
return team_metadata[team_id] | |
def _create_team(self, user_id: str, team_name: str, other_data: Dict[str, Any]) -> str: | |
""" | |
Create new team (internal method) | |
""" | |
with self._lock: # Use lock to ensure thread safety | |
user_team = hf_hub_download( | |
repo_id=self.competition_id, | |
filename="user_team.json", | |
token=self.hf_token, | |
repo_type="dataset", | |
) | |
with open(user_team, "r", encoding="utf-8") as f: | |
user_team = json.load(f) | |
team_metadata = hf_hub_download( | |
repo_id=self.competition_id, | |
filename="teams.json", | |
token=self.hf_token, | |
repo_type="dataset", | |
) | |
with open(team_metadata, "r", encoding="utf-8") as f: | |
team_metadata = json.load(f) | |
# Create new team ID | |
team_id = str(uuid.uuid4()) | |
user_team[user_id] = team_id | |
# Create team metadata | |
team_metadata[team_id] = { | |
"id": team_id, | |
"name": team_name, | |
"members": [user_id], | |
"leader": user_id, | |
"other_data": other_data, | |
} | |
# Upload user-team mapping file | |
user_team_json = json.dumps(user_team, indent=4) | |
user_team_json_bytes = user_team_json.encode("utf-8") | |
user_team_json_buffer = io.BytesIO(user_team_json_bytes) | |
# Upload team metadata file | |
team_metadata_json = json.dumps(team_metadata, indent=4) | |
team_metadata_json_bytes = team_metadata_json.encode("utf-8") | |
team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes) | |
api = HfApi(token=self.hf_token) | |
api.upload_file( | |
path_or_fileobj=user_team_json_buffer, | |
path_in_repo="user_team.json", | |
repo_id=self.competition_id, | |
repo_type="dataset", | |
) | |
api.upload_file( | |
path_or_fileobj=team_metadata_json_buffer, | |
path_in_repo="teams.json", | |
repo_id=self.competition_id, | |
repo_type="dataset", | |
) | |
team_submission_info = {} | |
team_submission_info["id"] = team_id | |
team_submission_info["submissions"] = [] | |
team_submission_info_json = json.dumps(team_submission_info, indent=4) | |
team_submission_info_json_bytes = team_submission_info_json.encode("utf-8") | |
team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes) | |
api.upload_file( | |
path_or_fileobj=team_submission_info_json_buffer, | |
path_in_repo=f"submission_info/{team_id}.json", | |
repo_id=self.competition_id, | |
repo_type="dataset", | |
) | |
return team_id | |
def create_team(self, user_token: str, team_name: str, other_data: Dict[str, Any]) -> str: | |
""" | |
Create team using user token (public interface) | |
""" | |
user_info = token_information(token=user_token) | |
return self._create_team(user_info["id"], team_name, other_data) | |
def get_team_info(self, user_token: str) -> Optional[Dict[str, Any]]: | |
""" | |
Get user's team information | |
""" | |
user_info = token_information(token=user_token) | |
return self._get_team_info(user_info["id"]) | |
def update_team_name(self, user_token, new_team_name): | |
""" | |
Update team name | |
""" | |
user_info = token_information(token=user_token) | |
user_id = user_info["id"] | |
team_info = self._get_team_info(user_id) | |
with self._lock: | |
team_metadata = hf_hub_download( | |
repo_id=self.competition_id, | |
filename="teams.json", | |
token=self.hf_token, | |
repo_type="dataset", | |
) | |
with open(team_metadata, "r", encoding="utf-8") as f: | |
team_metadata = json.load(f) | |
team_metadata[team_info["id"]]["name"] = new_team_name | |
team_metadata_json = json.dumps(team_metadata, indent=4) | |
team_metadata_json_bytes = team_metadata_json.encode("utf-8") | |
team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes) | |
api = HfApi(token=self.hf_token) | |
api.upload_file( | |
path_or_fileobj=team_metadata_json_buffer, | |
path_in_repo="teams.json", | |
repo_id=self.competition_id, | |
repo_type="dataset", | |
) | |
return new_team_name | |
# Cache for 1 minute | |
def get_team_white_list(self) -> List[str]: | |
""" | |
Get team whitelist (cached for 10 minutes) | |
""" | |
try: | |
file = hf_hub_download( | |
repo_id=self.competition_id, | |
filename="team_id_whitelist.json", | |
token=self.hf_token, | |
repo_type="dataset", | |
) | |
with open(file, "r", encoding="utf-8") as f: | |
team_white_list = json.load(f) | |
return team_white_list | |
except: | |
# Return empty list if whitelist file doesn't exist | |
return [] | |
# Create global team file API instance | |
team_file_api = TeamFileApi( | |
os.environ.get("HF_TOKEN", None), | |
os.environ.get("COMPETITION_ID"), | |
) | |
class LeaderboardApi: | |
""" | |
Simplified leaderboard API class, similar to reference code design | |
Automatically displays the best score for each team without manual selection | |
""" | |
def __init__(self, hf_token: str, competition_id: str): | |
self.hf_token = hf_token | |
self.competition_id = competition_id | |
self.api = HfApi(token=hf_token) | |
# Cache for 5 minutes | |
def get_leaderboard(self) -> pd.DataFrame: | |
""" | |
Get leaderboard data | |
Returns: | |
pd.DataFrame: Leaderboard DataFrame | |
""" | |
all_scores = self._get_all_scores() | |
if not all_scores: | |
return pd.DataFrame() | |
df = pd.DataFrame(all_scores) | |
# Keep only the best score for each team (data already processed in _get_all_scores) | |
# Sort by extended_pdm_score_combined in descending order | |
if "extended_pdm_score_combined" in df.columns: | |
df = df.sort_values(by=["extended_pdm_score_combined"], ascending=[False]) | |
# Add ranking | |
df = df.reset_index(drop=True) | |
df['rank'] = range(1, len(df) + 1) | |
# Keep 4 decimal places | |
score_columns = [col for col in df.columns if col not in ['team_id', 'team_name', 'submission_datetime', 'rank']] | |
for col in score_columns: | |
if df[col].dtype in ['float64', 'float32', 'int64', 'int32']: | |
df[col] = df[col].round(4) | |
# Reorder columns: rank, team_name, all score columns, submission_datetime | |
available_columns = ['rank', 'team_name'] + [col for col in score_columns if col in df.columns] + ['submission_datetime'] | |
df = df[available_columns] | |
# Format submission_datetime | |
if 'submission_datetime' in df.columns: | |
df['submission_datetime'] = pd.to_datetime(df['submission_datetime']).dt.strftime('%Y-%m-%d %H:%M:%S') | |
return df | |
def _get_all_scores(self) -> List[Dict[str, Any]]: | |
"""Get score data for all teams""" | |
try: | |
# Download team metadata | |
team_metadata_path = hf_hub_download( | |
repo_id=self.competition_id, | |
filename="teams.json", | |
token=self.hf_token, | |
repo_type="dataset", | |
) | |
with open(team_metadata_path, "r", encoding="utf-8") as f: | |
team_metadata = json.load(f) | |
# Download all submission information | |
submissions_folder = snapshot_download( | |
repo_id=self.competition_id, | |
allow_patterns="submission_info/*.json", | |
token=self.hf_token, | |
repo_type="dataset", | |
) | |
submission_jsons = glob.glob(os.path.join(submissions_folder, "submission_info/*.json")) | |
all_scores = [] | |
for json_path in submission_jsons: | |
with open(json_path, "r", encoding="utf-8") as f: | |
submission_data = json.load(f) | |
team_id = submission_data["id"] | |
# Check if team exists in metadata | |
if team_id not in team_metadata: | |
logger.warning(f"Team {team_id} not found in team metadata, skipping...") | |
continue | |
# Collect all successful submissions for this team | |
team_submissions = [] | |
for sub in submission_data["submissions"]: | |
if sub["status"] != SubmissionStatus.SUCCESS.value: | |
continue | |
# Check if score data exists | |
if "public_score" not in sub: | |
continue | |
# Create a record containing all scores | |
score_record = { | |
"team_id": team_id, | |
"team_name": team_metadata[team_id]["name"], | |
"submission_datetime": sub["datetime"] | |
} | |
# Expand all public_score key-value pairs | |
for score_key, score_value in sub["public_score"].items(): | |
score_record[score_key] = score_value | |
team_submissions.append(score_record) | |
# Select best submission for this team (based on extended_pdm_score_combined) | |
if team_submissions: | |
# Sort by extended_pdm_score_combined in descending order, take the best | |
if "extended_pdm_score_combined" in team_submissions[0]: | |
team_submissions.sort(key=lambda x: x["extended_pdm_score_combined"], reverse=True) | |
best_submission = team_submissions[0] | |
all_scores.append(best_submission) | |
return all_scores | |
except Exception as e: | |
logger.error(f"Error getting leaderboard scores: {e}") | |
return [] | |
# Create global leaderboard_api instance | |
leaderboard_api = LeaderboardApi( | |
hf_token=os.environ.get("HF_TOKEN", None), | |
competition_id=os.environ.get("COMPETITION_ID") | |
) | |