GihhArwtw's picture
[fix] fix utils.teamFileApi._create_team
8deda7a
import io
import json
import os
import shlex
import subprocess
import traceback
import threading
import uuid
import glob
from typing import Dict, Any, List, Optional
import requests
import pandas as pd
from fastapi import Request
from huggingface_hub import HfApi, hf_hub_download, snapshot_download
from loguru import logger
from cachetools import TTLCache, cached
from competitions.enums import SubmissionStatus
from competitions.params import EvalParams
from . import HF_URL
USER_TOKEN = os.environ.get("USER_TOKEN")
def token_information(token):
if token.startswith("hf_oauth"):
_api_url = HF_URL + "/oauth/userinfo"
else:
_api_url = HF_URL + "/api/whoami-v2"
headers = {}
cookies = {}
if token.startswith("hf_"):
headers["Authorization"] = f"Bearer {token}"
else:
cookies = {"token": token}
try:
response = requests.get(
_api_url,
headers=headers,
cookies=cookies,
timeout=10, # Increased timeout from 3 to 10 seconds
)
except (requests.Timeout, ConnectionError) as err:
logger.error(f"Failed to request whoami-v2 - {repr(err)}")
raise Exception("Hugging Face Hub is unreachable, please try again later.")
if response.status_code != 200:
logger.error(f"Failed to request whoami-v2 - {response.status_code}")
raise Exception("Invalid token.")
resp = response.json()
user_info = {}
if token.startswith("hf_oauth"):
user_info["id"] = resp["sub"]
user_info["name"] = resp["preferred_username"]
user_info["orgs"] = [resp["orgs"][k]["preferred_username"] for k in range(len(resp["orgs"]))]
else:
user_info["id"] = resp["id"]
user_info["name"] = resp["name"]
user_info["orgs"] = [resp["orgs"][k]["name"] for k in range(len(resp["orgs"]))]
return user_info
def user_authentication(request: Request):
auth_header = request.headers.get("Authorization")
bearer_token = None
if auth_header and auth_header.startswith("Bearer "):
bearer_token = auth_header.split(" ")[1]
if bearer_token:
try:
_ = token_information(token=bearer_token)
return bearer_token
except Exception as e:
logger.error(f"Failed to verify token: {e}")
return None
if USER_TOKEN is not None:
try:
_ = token_information(token=USER_TOKEN)
return USER_TOKEN
except Exception as e:
logger.error(f"Failed to verify token: {e}")
return None
if "oauth_info" in request.session:
try:
_ = token_information(token=request.session["oauth_info"]["access_token"])
return request.session["oauth_info"]["access_token"]
except Exception as e:
request.session.pop("oauth_info", None)
logger.error(f"Failed to verify token: {e}")
return None
return None
def user_authentication_dep(token, return_raw=False):
if token.startswith("hf_oauth"):
_api_url = HF_URL + "/oauth/userinfo"
else:
_api_url = HF_URL + "/api/whoami-v2"
headers = {}
cookies = {}
if token.startswith("hf_"):
headers["Authorization"] = f"Bearer {token}"
else:
cookies = {"token": token}
try:
response = requests.get(
_api_url,
headers=headers,
cookies=cookies,
timeout=10, # Increased timeout from 3 to 10 seconds
)
except (requests.Timeout, ConnectionError) as err:
logger.error(f"Failed to request whoami-v2 - {repr(err)}")
raise Exception("Hugging Face Hub is unreachable, please try again later.")
resp = response.json()
if return_raw:
return resp
user_info = {}
if "error" in resp:
return resp
if token.startswith("hf_oauth"):
user_info["id"] = resp["sub"]
user_info["name"] = resp["preferred_username"]
user_info["orgs"] = [resp["orgs"][k]["preferred_username"] for k in range(len(resp["orgs"]))]
else:
user_info["id"] = resp["id"]
user_info["name"] = resp["name"]
user_info["orgs"] = [resp["orgs"][k]["name"] for k in range(len(resp["orgs"]))]
return user_info
def make_clickable_user(user_id):
link = "https://huggingface.co/" + user_id
return f'<a target="_blank" href="{link}">{user_id}</a>'
def run_evaluation(params, local=False, wait=False):
params = json.loads(params)
if isinstance(params, str):
params = json.loads(params)
params = EvalParams(**params)
if not local:
params.output_path = "/tmp/model"
params.save(output_dir=params.output_path)
cmd = [
"python",
"-m",
"competitions.evaluate",
"--config",
os.path.join(params.output_path, "params.json"),
]
cmd = [str(c) for c in cmd]
logger.info(cmd)
env = os.environ.copy()
cmd = shlex.split(" ".join(cmd))
process = subprocess.Popen(cmd, env=env)
if wait:
process.wait()
return process.pid
def pause_space(params):
if "SPACE_ID" in os.environ:
if os.environ["SPACE_ID"].split("/")[-1].startswith("comp-"):
logger.info("Pausing space...")
api = HfApi(token=params.token)
api.pause_space(repo_id=os.environ["SPACE_ID"])
def delete_space(params):
if "SPACE_ID" in os.environ:
if os.environ["SPACE_ID"].split("/")[-1].startswith("comp-"):
logger.info("Deleting space...")
api = HfApi(token=params.token)
api.delete_repo(repo_id=os.environ["SPACE_ID"], repo_type="space")
def download_submission_info(params):
user_fname = hf_hub_download(
repo_id=params.competition_id,
filename=f"submission_info/{params.team_id}.json",
token=params.token,
repo_type="dataset",
)
with open(user_fname, "r", encoding="utf-8") as f:
user_submission_info = json.load(f)
return user_submission_info
def upload_submission_info(params, user_submission_info):
user_submission_info_json = json.dumps(user_submission_info, indent=4)
user_submission_info_json_bytes = user_submission_info_json.encode("utf-8")
user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes)
api = HfApi(token=params.token)
api.upload_file(
path_or_fileobj=user_submission_info_json_buffer,
path_in_repo=f"submission_info/{params.team_id}.json",
repo_id=params.competition_id,
repo_type="dataset",
)
def update_submission_status(params, status):
user_submission_info = download_submission_info(params)
for submission in user_submission_info["submissions"]:
if submission["submission_id"] == params.submission_id:
submission["status"] = status
break
upload_submission_info(params, user_submission_info)
def update_submission_score(params, public_score, private_score):
user_submission_info = download_submission_info(params)
for submission in user_submission_info["submissions"]:
if submission["submission_id"] == params.submission_id:
submission["public_score"] = public_score
submission["private_score"] = private_score
submission["status"] = "done"
break
upload_submission_info(params, user_submission_info)
def monitor(func):
def wrapper(*args, **kwargs):
params = kwargs.get("params", None)
if params is None and len(args) > 0:
params = args[0]
try:
return func(*args, **kwargs)
except Exception as e:
error_message = f"""{func.__name__} has failed due to an exception: {traceback.format_exc()}"""
logger.error(error_message)
logger.error(str(e))
update_submission_status(params, SubmissionStatus.FAILED.value)
pause_space(params)
return wrapper
def uninstall_requirements(requirements_fname):
if os.path.exists(requirements_fname):
# read the requirements.txt
uninstall_list = []
with open(requirements_fname, "r", encoding="utf-8") as f:
for line in f:
if line.startswith("-"):
uninstall_list.append(line[1:])
# create an uninstall.txt
with open("uninstall.txt", "w", encoding="utf-8") as f:
for line in uninstall_list:
f.write(line)
pipe = subprocess.Popen(
[
"pip",
"uninstall",
"-r",
"uninstall.txt",
"-y",
],
)
pipe.wait()
logger.info("Requirements uninstalled.")
return
def install_requirements(requirements_fname):
# check if params.project_name has a requirements.txt
if os.path.exists(requirements_fname):
# install the requirements using subprocess, wait for it to finish
install_list = []
with open(requirements_fname, "r", encoding="utf-8") as f:
for line in f:
# if line startswith - then skip but dont skip if line startswith --
if line.startswith("-"):
if not line.startswith("--"):
continue
install_list.append(line)
with open("install.txt", "w", encoding="utf-8") as f:
for line in install_list:
f.write(line)
pipe = subprocess.Popen(
[
"pip",
"install",
"-r",
"install.txt",
],
)
pipe.wait()
logger.info("Requirements installed.")
return
logger.info("No requirements.txt found. Skipping requirements installation.")
return
def is_user_admin(user_token, competition_organization):
user_info = token_information(token=user_token)
user_orgs = user_info.get("orgs", [])
for org in user_orgs:
if org == competition_organization:
return True
return False
def get_team_name(user_token, competition_id, hf_token):
user_info = token_information(token=user_token)
user_id = user_info["id"]
user_team = hf_hub_download(
repo_id=competition_id,
filename="user_team.json",
token=hf_token,
repo_type="dataset",
)
with open(user_team, "r", encoding="utf-8") as f:
user_team = json.load(f)
if user_id not in user_team:
return None
team_id = user_team[user_id]
team_metadata = hf_hub_download(
repo_id=competition_id,
filename="teams.json",
token=hf_token,
repo_type="dataset",
)
with open(team_metadata, "r", encoding="utf-8") as f:
team_metadata = json.load(f)
team_name = team_metadata[team_id]["name"]
return team_name
def update_team_name(user_token, new_team_name, competition_id, hf_token):
user_info = token_information(token=user_token)
user_id = user_info["id"]
user_team = hf_hub_download(
repo_id=competition_id,
filename="user_team.json",
token=hf_token,
repo_type="dataset",
)
with open(user_team, "r", encoding="utf-8") as f:
user_team = json.load(f)
if user_id not in user_team:
raise Exception("User is not part of a team")
team_id = user_team[user_id]
team_metadata = hf_hub_download(
repo_id=competition_id,
filename="teams.json",
token=hf_token,
repo_type="dataset",
)
with open(team_metadata, "r", encoding="utf-8") as f:
team_metadata = json.load(f)
team_metadata[team_id]["name"] = new_team_name
team_metadata_json = json.dumps(team_metadata, indent=4)
team_metadata_json_bytes = team_metadata_json.encode("utf-8")
team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes)
api = HfApi(token=hf_token)
api.upload_file(
path_or_fileobj=team_metadata_json_buffer,
path_in_repo="teams.json",
repo_id=competition_id,
repo_type="dataset",
)
return new_team_name
class TeamAlreadyExistsError(Exception):
"""Custom exception for when a team already exists."""
pass
class TeamFileApi:
"""
Team File Management API Class
This class manages all team-related operations:
- Create new teams
- Query team information
- Update team names
- Manage team whitelist
"""
def __init__(self, hf_token: str, competition_id: str):
self.hf_token = hf_token
self.competition_id = competition_id
self._lock = threading.Lock() # Thread lock to ensure thread safety for concurrent operations
def _get_team_info(self, user_id: str) -> Optional[Dict[str, Any]]:
"""
Get team information by user ID
"""
user_team = hf_hub_download(
repo_id=self.competition_id,
filename="user_team.json",
token=self.hf_token,
repo_type="dataset",
)
with open(user_team, "r", encoding="utf-8") as f:
user_team = json.load(f)
if user_id not in user_team:
return None
team_id = user_team[user_id]
team_metadata = hf_hub_download(
repo_id=self.competition_id,
filename="teams.json",
token=self.hf_token,
repo_type="dataset",
)
with open(team_metadata, "r", encoding="utf-8") as f:
team_metadata = json.load(f)
return team_metadata[team_id]
def _create_team(self, user_id: str, team_name: str, other_data: Dict[str, Any]) -> str:
"""
Create new team (internal method)
"""
with self._lock: # Use lock to ensure thread safety
user_team = hf_hub_download(
repo_id=self.competition_id,
filename="user_team.json",
token=self.hf_token,
repo_type="dataset",
)
with open(user_team, "r", encoding="utf-8") as f:
user_team = json.load(f)
team_metadata = hf_hub_download(
repo_id=self.competition_id,
filename="teams.json",
token=self.hf_token,
repo_type="dataset",
)
with open(team_metadata, "r", encoding="utf-8") as f:
team_metadata = json.load(f)
# Create new team ID
team_id = str(uuid.uuid4())
user_team[user_id] = team_id
# Create team metadata
team_metadata[team_id] = {
"id": team_id,
"name": team_name,
"members": [user_id],
"leader": user_id,
"other_data": other_data,
}
# Upload user-team mapping file
user_team_json = json.dumps(user_team, indent=4)
user_team_json_bytes = user_team_json.encode("utf-8")
user_team_json_buffer = io.BytesIO(user_team_json_bytes)
# Upload team metadata file
team_metadata_json = json.dumps(team_metadata, indent=4)
team_metadata_json_bytes = team_metadata_json.encode("utf-8")
team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes)
api = HfApi(token=self.hf_token)
api.upload_file(
path_or_fileobj=user_team_json_buffer,
path_in_repo="user_team.json",
repo_id=self.competition_id,
repo_type="dataset",
)
api.upload_file(
path_or_fileobj=team_metadata_json_buffer,
path_in_repo="teams.json",
repo_id=self.competition_id,
repo_type="dataset",
)
team_submission_info = {}
team_submission_info["id"] = team_id
team_submission_info["submissions"] = []
team_submission_info_json = json.dumps(team_submission_info, indent=4)
team_submission_info_json_bytes = team_submission_info_json.encode("utf-8")
team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes)
api.upload_file(
path_or_fileobj=team_submission_info_json_buffer,
path_in_repo=f"submission_info/{team_id}.json",
repo_id=self.competition_id,
repo_type="dataset",
)
return team_id
def create_team(self, user_token: str, team_name: str, other_data: Dict[str, Any]) -> str:
"""
Create team using user token (public interface)
"""
user_info = token_information(token=user_token)
return self._create_team(user_info["id"], team_name, other_data)
def get_team_info(self, user_token: str) -> Optional[Dict[str, Any]]:
"""
Get user's team information
"""
user_info = token_information(token=user_token)
return self._get_team_info(user_info["id"])
def update_team_name(self, user_token, new_team_name):
"""
Update team name
"""
user_info = token_information(token=user_token)
user_id = user_info["id"]
team_info = self._get_team_info(user_id)
with self._lock:
team_metadata = hf_hub_download(
repo_id=self.competition_id,
filename="teams.json",
token=self.hf_token,
repo_type="dataset",
)
with open(team_metadata, "r", encoding="utf-8") as f:
team_metadata = json.load(f)
team_metadata[team_info["id"]]["name"] = new_team_name
team_metadata_json = json.dumps(team_metadata, indent=4)
team_metadata_json_bytes = team_metadata_json.encode("utf-8")
team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes)
api = HfApi(token=self.hf_token)
api.upload_file(
path_or_fileobj=team_metadata_json_buffer,
path_in_repo="teams.json",
repo_id=self.competition_id,
repo_type="dataset",
)
return new_team_name
@cached(cache=TTLCache(maxsize=1, ttl=60)) # Cache for 1 minute
def get_team_white_list(self) -> List[str]:
"""
Get team whitelist (cached for 10 minutes)
"""
try:
file = hf_hub_download(
repo_id=self.competition_id,
filename="team_id_whitelist.json",
token=self.hf_token,
repo_type="dataset",
)
with open(file, "r", encoding="utf-8") as f:
team_white_list = json.load(f)
return team_white_list
except:
# Return empty list if whitelist file doesn't exist
return []
# Create global team file API instance
team_file_api = TeamFileApi(
os.environ.get("HF_TOKEN", None),
os.environ.get("COMPETITION_ID"),
)
class LeaderboardApi:
"""
Simplified leaderboard API class, similar to reference code design
Automatically displays the best score for each team without manual selection
"""
def __init__(self, hf_token: str, competition_id: str):
self.hf_token = hf_token
self.competition_id = competition_id
self.api = HfApi(token=hf_token)
@cached(cache=TTLCache(maxsize=1, ttl=300)) # Cache for 5 minutes
def get_leaderboard(self) -> pd.DataFrame:
"""
Get leaderboard data
Returns:
pd.DataFrame: Leaderboard DataFrame
"""
all_scores = self._get_all_scores()
if not all_scores:
return pd.DataFrame()
df = pd.DataFrame(all_scores)
# Keep only the best score for each team (data already processed in _get_all_scores)
# Sort by extended_pdm_score_combined in descending order
if "extended_pdm_score_combined" in df.columns:
df = df.sort_values(by=["extended_pdm_score_combined"], ascending=[False])
# Add ranking
df = df.reset_index(drop=True)
df['rank'] = range(1, len(df) + 1)
# Keep 4 decimal places
score_columns = [col for col in df.columns if col not in ['team_id', 'team_name', 'submission_datetime', 'rank']]
for col in score_columns:
if df[col].dtype in ['float64', 'float32', 'int64', 'int32']:
df[col] = df[col].round(4)
# Reorder columns: rank, team_name, all score columns, submission_datetime
available_columns = ['rank', 'team_name'] + [col for col in score_columns if col in df.columns] + ['submission_datetime']
df = df[available_columns]
# Format submission_datetime
if 'submission_datetime' in df.columns:
df['submission_datetime'] = pd.to_datetime(df['submission_datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')
return df
def _get_all_scores(self) -> List[Dict[str, Any]]:
"""Get score data for all teams"""
try:
# Download team metadata
team_metadata_path = hf_hub_download(
repo_id=self.competition_id,
filename="teams.json",
token=self.hf_token,
repo_type="dataset",
)
with open(team_metadata_path, "r", encoding="utf-8") as f:
team_metadata = json.load(f)
# Download all submission information
submissions_folder = snapshot_download(
repo_id=self.competition_id,
allow_patterns="submission_info/*.json",
token=self.hf_token,
repo_type="dataset",
)
submission_jsons = glob.glob(os.path.join(submissions_folder, "submission_info/*.json"))
all_scores = []
for json_path in submission_jsons:
with open(json_path, "r", encoding="utf-8") as f:
submission_data = json.load(f)
team_id = submission_data["id"]
# Check if team exists in metadata
if team_id not in team_metadata:
logger.warning(f"Team {team_id} not found in team metadata, skipping...")
continue
# Collect all successful submissions for this team
team_submissions = []
for sub in submission_data["submissions"]:
if sub["status"] != SubmissionStatus.SUCCESS.value:
continue
# Check if score data exists
if "public_score" not in sub:
continue
# Create a record containing all scores
score_record = {
"team_id": team_id,
"team_name": team_metadata[team_id]["name"],
"submission_datetime": sub["datetime"]
}
# Expand all public_score key-value pairs
for score_key, score_value in sub["public_score"].items():
score_record[score_key] = score_value
team_submissions.append(score_record)
# Select best submission for this team (based on extended_pdm_score_combined)
if team_submissions:
# Sort by extended_pdm_score_combined in descending order, take the best
if "extended_pdm_score_combined" in team_submissions[0]:
team_submissions.sort(key=lambda x: x["extended_pdm_score_combined"], reverse=True)
best_submission = team_submissions[0]
all_scores.append(best_submission)
return all_scores
except Exception as e:
logger.error(f"Error getting leaderboard scores: {e}")
return []
# Create global leaderboard_api instance
leaderboard_api = LeaderboardApi(
hf_token=os.environ.get("HF_TOKEN", None),
competition_id=os.environ.get("COMPETITION_ID")
)