import gradio as gr import pandas as pd import time import torch import os import torchvision.transforms as transforms from torchvision import datasets import torch.nn.functional as F from torch.utils.data import DataLoader BASE = {'ottawa':(45.30326753851309,-75.93640391349997), 'ali_home':(37.88560412289598,-122.30218612514359), 'josh_home':(37.8697406, -122.30218612514359), 'cory':(37.8697406,-122.281570)} def get_base(filename): if "home" in filename: return BASE["ali_home"] elif "ottawa" in filename: return BASE["ottawa"] elif "josh" in filename: return BASE["josh_home"] else: return BASE["cory"] from datasets import load_dataset # Load dataset dataset = load_dataset("IndoorOutdoor/1090dumpData") # Get dataset cache directory (where files are stored) dataset_dir = dataset.cache_files print("Dataset directory:", dataset_dir) #for root, dirs, files in os.walk(dataset_dir): # for file in files: # print(os.path.join(root, file)) # Print full file paths def read_dump1090_file(filename): with open(filename, 'r') as file: adsb_data = file.read().split('*') adsb_data = adsb_data[1:] # Sample data in adsb_data # 8da60e809910c81790b40c0c3a21; # CRC: 000000 # RSSI: -27.8 dBFS # Score: 1800 # Time: 20160.42us # DF:17 AA:A60E80 CA:5 ME:9910C81790B40C # Extended Squitter Airborne velocity over ground, subsonic (19/1) # ICAO Address: A60E80 (Mode S / ADS-B) # Air/Ground: airborne # GNSS delta: 275 ft # Heading: 47 # Speed: 273 kt groundspeed # Vertical rate: 2816 ft/min GNSS adsb_dict = {} for message in adsb_data: if "ICAO Address" not in message: continue icao_id = message.split('ICAO Address: ')[1].split(' (')[0] time = float(message.split('Time: ')[1].split('us')[0]) rssi = float(message.split('RSSI: ')[1].split(' dBFS')[0]) lat = lon = None try: if "CPR latitude:" in message: lat = float(message.split('CPR latitude:')[1].split('(')[0]) lon = float(message.split('CPR longitude:')[1].split('(')[0]) except: pass alt = None try: if "Altitude: " in message and "ft barometric" in message: alt = float(message.split('Altitude:')[1].split('ft')[0]) if "Baro altitude: " in message and "ft" in message: alt = float(message.split('altitude:')[1].split('ft')[0]) except: pass #print("non standard alt") if icao_id in adsb_dict: last_rssi = adsb_dict[icao_id][-1][1] delta_rssi = abs(last_rssi - rssi) adsb_dict[icao_id].append([time , rssi, delta_rssi, lat, lon, alt]) else: adsb_dict[icao_id] = [[time , rssi, 0, lat, lon, alt]] return adsb_dict #Compare the model output with ground truth #return TP, FP, FN, TN #Computes stats based on sectors rathere than just indoor vs outdoor def compute_stats_sector(sectors_model, sector_gt): TP = FP = FN = TN = 0 ignored = 0 for i in range(len(sector_gt)): if sector_gt[i] == 1: if sectors_model[i] > 0 or sectors_model[(i+1) % 8] > 0 or sectors_model[(i-1) % 8] > 0 : TP += 1 else: FN += 1 else: if sectors_model[i] > 0: if sector_gt[(i-1) % 8] > 0 or sector_gt[(i+1) % 8] > 0: TP += 1 continue FP += 1 else: TN += 1 NUM_SECTORS = 8 - ignored return [TP / NUM_SECTORS, FP / NUM_SECTORS, FN / NUM_SECTORS, TN / NUM_SECTORS] #Compare the model output with ground truth #return TP, FP, FN, TN #This fuction compute stats when the model is binary i.e., outputs only indoor vs outdoor def compute_stats_in_out(sectors_model, indoor_gt): if indoor_gt: #if groundtruth is indoor for i in range(len(sectors_model)): if sectors_model[i]: return [0,1,0,0] return [0,0,0,1] else: #if outdoor for i in range(len(sectors_model)): if sectors_model[i]: return [1,0,0,0] return [0,0,1,0] def read_configuration(filename): with open(filename, 'r') as file: data = file.read().split('\n') data = data[1:] #ignore the header exp = {} for line in data: if len(line) == 0: continue tokens =line.split(',') file = tokens[0] scenario = tokens[1] indoor = True if tokens[2] == "TRUE" else 0 exp[scenario] = {'sectors':[1 if x == "TRUE" else 0 for x in tokens[3:]], 'indoor':indoor, "file":file} return exp # Initialize leaderboard storage leaderboard_data = pd.DataFrame(columns=["Username", "Execution Time (s)", "Accuracy", "Status"]) # `data` directory for persistent storage HF_STORAGE_DIR = "data" if not os.path.exists(HF_STORAGE_DIR): os.makedirs(HF_STORAGE_DIR) def evaluate_model(username, file): global leaderboard_data username = username.strip() if not username: return leaderboard_data.values.tolist() if isinstance(file, str): temp_file_path = file else: temp_file_path = os.path.join(HF_STORAGE_DIR, f"{username}_model.pt") with open(temp_file_path, "wb") as temp_file: temp_file.write(file.read()) try: exp = read_configuration(dataset["metadata.csv"]) # stats_model_sectors = [] # stats_model_in_out = [] # for key in exp: # filename = exp[key]['file'] # #Groundtruth for each dataset # indoor_gt = exp[key]['indoor'] # sectors_gt = exp[key]["sectors"] # print("Dataset: ", filename) # print("Indoor:\t", indoor_gt) # print("Ground Truth sectors:\t", sectors_gt) # #Do clustering for each sector. This will be used later to figure out sector-based and non-sector-based classification # sectors_model = # TODO: CALL USER FTN # print("Estimated sectors:\t", sectors_model) # stats_model_sectors.append(compute_stats_sector(sectors_model, sectors_gt)) # stats_model_in_out.append(compute_stats_in_out(sectors_model, indoor_gt)) # print("----------------------------") # TP = np.mean([x[0] for x in stats_model_sectors]) # FP = np.mean([x[1] for x in stats_model_sectors]) # FN = np.mean([x[2] for x in stats_model_sectors]) # TN = np.mean([x[3] for x in stats_model_sectors]) # print(TP, FP, FN, TN) # TP = np.mean([x[0] for x in stats_model_in_out]) # FP = np.mean([x[1] for x in stats_model_in_out]) # FN = np.mean([x[2] for x in stats_model_in_out]) # TN = np.mean([x[3] for x in stats_model_in_out]) # print(TP, FP, FN, TN) # end_time = time.time() # exec_time = end_time - start_time # print(f"Execution Time: {exec_time} seconds") except Exception as e: leaderboard_data = pd.concat([leaderboard_data, pd.DataFrame([[username, float("inf"), 0, f"Model Load Error: {str(e)}"]], columns=["Username", "Execution Time (s)", "Accuracy", "Status"])], ignore_index=True) return leaderboard_data.values.tolist() # Measure execution time start_time = time.time() correct = 0 total = 0 # Run inference on test dataset with torch.no_grad(): for images, labels in test_loader: outputs = model(images) _, predicted = torch.max(outputs, 1) correct += (predicted == labels).sum().item() total += labels.size(0) execution_time = round(time.time() - start_time, 4) accuracy = round(100 * correct / total, 2) status = "Success" if accuracy > 0 else "Incorrect Model" # Append to leaderboard new_entry = pd.DataFrame([[username, execution_time, accuracy, status]], columns=["Username", "Execution Time (s)", "Accuracy", "Status"]) leaderboard_data = pd.concat([leaderboard_data, new_entry], ignore_index=True) # Sort leaderboard: prioritize accuracy first, then execution time leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"], ascending=[False, True]).reset_index(drop=True) return leaderboard_data.values.tolist() # Create Gradio UI with gr.Blocks() as demo: gr.Markdown("# 🚀 Model Submission & Leaderboard (Hugging Face Spaces)") with gr.Row(): username_input = gr.Textbox(label="Username") file_input = gr.File(label="Upload PyTorch Model (.pt)") requirements_input = gr.File(label="Upload requirements.txt") submit_button = gr.Button("Submit Model") leaderboard_display = gr.Dataframe(headers=["Username", "Execution Time (s)", "Accuracy", "Status"], label="Leaderboard") def update_leaderboard(username, file): updated_leaderboard = evaluate_model(username, file) return updated_leaderboard submit_button.click(fn=update_leaderboard, inputs=[username_input, file_input], outputs=leaderboard_display) # Launch Gradio app demo.launch()