# TalentLens import os import time # Add time module import from io import BytesIO import streamlit as st import fitz # PyMuPDF import requests from dotenv import load_dotenv from config import supabase, HF_API_TOKEN, HF_HEADERS, HF_ENDPOINTS from utils.parser import parse_resume, extract_email, summarize_resume from utils.hybrid_extractor import extract_resume_sections from utils.builder import build_resume_from_data from utils.screening import evaluate_resumes from utils.reporting import generate_pdf_report, generate_interview_questions_from_summaries def toggle_endpoint(endpoint_name, action): """Start or stop an endpoint""" try: from config import HF_HEADERS, HF_ENDPOINTS # Use the health endpoint endpoint_info = HF_ENDPOINTS[endpoint_name] url = f"{endpoint_info['url']}/health" # Use HEAD request to start the endpoint response = requests.head(url, headers=HF_HEADERS) if response.status_code == 503: st.info("🚀 Starting endpoint... This may take 5-6 minutes. Click on 'Start' again to refresh status.") time.sleep(2) # Wait briefly before refreshing status from config import check_endpoint_status new_status = check_endpoint_status(endpoint_name) st.session_state['endpoint_status'] = {endpoint_name: new_status} elif response.status_code == 200: st.success("✅ Endpoint is running") time.sleep(2) # Wait briefly before refreshing status from config import check_endpoint_status new_status = check_endpoint_status(endpoint_name) st.session_state['endpoint_status'] = {endpoint_name: new_status} else: st.error(f"❌ Failed to {action} endpoint: {response.text}") except Exception as e: st.error(f"❌ Failed to {action} endpoint: {str(e)}") # ------------------------- Main App Function ------------------------- def main(): st.set_page_config( page_title="TalentLens.AI", layout="centered", initial_sidebar_state="collapsed" ) # Hide sidebar completely with CSS st.markdown(""" """, unsafe_allow_html=True) st.markdown("

TalentLens.AI

", unsafe_allow_html=True) st.divider() st.markdown("

AI-Powered Intelligent Resume Screening

", unsafe_allow_html=True) # Upload resumes (limit: 10 files) uploaded_files = st.file_uploader( "Upload Resumes (PDF Only, Max: 10)", accept_multiple_files=True, type=["pdf"] ) if uploaded_files and len(uploaded_files) > 10: st.error("⚠️ You can upload a maximum of 10 resumes at a time.") return # Input job description job_description = st.text_area("Enter Job Description") # Main action buttons col1, col2 = st.columns(2) with col1: # Evaluation trigger evaluate_clicked = st.button("\U0001F4CA Evaluate Resumes", type="primary", use_container_width=True) with col2: # Format Resume redirect button format_clicked = st.button("\U0001F4C4 Format Resume", use_container_width=True) # Handle Format Resume redirect if format_clicked: st.switch_page("pages/Template.py") # Handle Evaluate Resumes if evaluate_clicked: if not job_description: st.error("⚠️ Please enter a job description.") return if not uploaded_files: st.error("⚠️ Please upload at least one resume.") return st.write("### Evaluating Resumes...") # Resume Evaluation shortlisted, removed_candidates = evaluate_resumes(uploaded_files, job_description) if not shortlisted: st.warning("⚠️ No resumes matched the required keywords.") else: st.subheader("✅ Shortlisted Candidates:") for candidate in shortlisted: st.write(f"**{candidate['name']}**") # Generate Interview Questions questions = generate_interview_questions_from_summaries(shortlisted) st.subheader("🧠 Suggested Interview Questions:") for idx, q in enumerate(questions, 1): st.markdown(f"{q}") # Downloadable PDF Report pdf_report = generate_pdf_report(shortlisted, questions) st.download_button("Download Shortlist Report", pdf_report, "shortlist.pdf") # Removed Candidates Info if removed_candidates: st.subheader("❌ Resumes Removed:") for removed in removed_candidates: st.write(f"**{removed['name']}** - {removed['reason']}") # Get current status using DNS resolution from config import check_endpoint_status endpoint_name = "vzwjawyxvu030jsw" # Updated to match endpoint ID current_status = check_endpoint_status(endpoint_name) state = current_status.get('status', 'unknown') # Update session state with current status st.session_state['endpoint_status'] = {endpoint_name: current_status} # Show Start button and status start_button = st.empty() # Placeholder for Start button if state in ['stopped', 'error']: if start_button.button("▶️ Start", key=f"start_{endpoint_name}", use_container_width=True): toggle_endpoint(endpoint_name, "start") # Refresh status after starting new_status = check_endpoint_status(endpoint_name) st.session_state['endpoint_status'] = {endpoint_name: new_status} if new_status.get('status') == 'running': st.success("✅ Endpoint is running") elif new_status.get('status') == 'starting': st.info("🚀 Starting endpoint... This may take 5-6 minutes. Click on 'Start' again to refresh status.") elif new_status.get('status') == 'error': st.error(f"❌ Error: {new_status.get('error', 'Unknown error')}") # ------------------------- Run the App ------------------------- if __name__ == "__main__": main()