Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import json | |
from datetime import datetime | |
import PyPDF2 | |
# import google. as genai | |
from google import genai | |
# --- Configuration and Setup --- | |
# Use a writable directory like /tmp for containerized environments | |
CLASSROOMS_DIR = "/tmp/classrooms" | |
if not os.path.exists(CLASSROOMS_DIR): | |
os.makedirs(CLASSROOMS_DIR) | |
# --- Helper Functions --- | |
def extract_text_from_pdf(pdf_files): | |
""" | |
Extracts text from a list of uploaded PDF files with improved error handling. | |
""" | |
full_text = "" | |
files_processed = 0 | |
if not pdf_files: | |
return "" | |
for pdf_file in pdf_files: | |
try: | |
# The UploadedFile object is a file-like object. Rewind it first. | |
pdf_file.seek(0) | |
pdf_reader = PyPDF2.PdfReader(pdf_file) | |
if not pdf_reader.pages: | |
st.warning(f"Warning: '{pdf_file.name}' contains no pages.") | |
continue | |
page_text = "" | |
for page in pdf_reader.pages: | |
text = page.extract_text() | |
if text: | |
page_text += text + "\n" | |
if page_text: | |
full_text += page_text | |
files_processed += 1 | |
else: | |
st.warning(f"Warning: Could not extract any text from '{pdf_file.name}'. The file might be image-based or have an unusual format.") | |
except PyPDF2.errors.PdfReadError: | |
st.error(f"Error: '{pdf_file.name}' is not a valid PDF file or is corrupted.") | |
except Exception as e: | |
st.error(f"An unexpected error occurred while processing {pdf_file.name}: {e}. The file might be password-protected.") | |
if files_processed > 0: | |
st.info(f"Successfully extracted text from {files_processed} out of {len(pdf_files)} file(s).") | |
return full_text | |
def save_classroom_data(class_name, api_key, context): | |
"""Saves the classroom data (context, API key, and chat log).""" | |
class_dir = os.path.join(CLASSROOMS_DIR, class_name) | |
if not os.path.exists(class_dir): | |
os.makedirs(class_dir) | |
# Save context | |
with open(os.path.join(class_dir, "context.txt"), "w", encoding="utf-8") as f: | |
f.write(context) | |
# Save API key | |
with open(os.path.join(class_dir, "api_key.txt"), "w") as f: | |
f.write(api_key) | |
# Initialize chat log | |
with open(os.path.join(class_dir, "chat_log.json"), "w") as f: | |
json.dump([], f) | |
return True | |
def get_gemini_response(api_key, context, chat_history, question): | |
"""Generates a response from the Gemini API based on the context.""" | |
try: | |
# genai.configure(api_key=api_key) | |
client = genai.Client(api_key=api_key) | |
# model = genai.GenerativeModel('gemini-2.5-flash') | |
# Construct a more detailed prompt for the model | |
prompt = ( | |
"You are a helpful classroom assistant. Your role is to answer questions based ONLY on the provided context. " | |
# "If the answer is not found in the context, you must state that you cannot answer the question based on the provided material. " | |
"Do not use any external knowledge.\n\n" | |
f"**Context:**\n{context}\n\n" | |
"**Chat History:**\n" | |
) | |
for message in chat_history: | |
prompt += f"{message['role']}: {message['content']}\n" | |
prompt += f"**New Question:**\n{question}\n\n**Answer:**" | |
response = client.models.generate_content( | |
model="gemini-2.5-pro", contents=prompt | |
) | |
# response = model.generate_content(prompt) | |
return response.text | |
except Exception as e: | |
return f"An error occurred with the Gemini API: {e}" | |
def log_interaction(class_name, user_query, bot_response): | |
"""Logs the student's interaction in the classroom's JSON file.""" | |
log_file = os.path.join(CLASSROOMS_DIR, class_name, "chat_log.json") | |
try: | |
with open(log_file, "r+") as f: | |
logs = json.load(f) | |
logs.append({ | |
"timestamp": datetime.now().isoformat(), | |
"user_query": user_query, | |
"bot_response": bot_response | |
}) | |
f.seek(0) | |
json.dump(logs, f, indent=4) | |
except (FileNotFoundError, json.JSONDecodeError): | |
# If file is empty or corrupt, start a new log | |
with open(log_file, "w") as f: | |
json.dump([{ | |
"timestamp": datetime.now().isoformat(), | |
"user_query": user_query, | |
"bot_response": bot_response | |
}], f, indent=4) | |
# --- Streamlit UI --- | |
st.set_page_config(page_title="Classroom Chatbot", layout="wide") | |
# Initialize session state variables | |
if 'role' not in st.session_state: | |
st.session_state.role = None | |
if 'class_name' not in st.session_state: | |
st.session_state.class_name = None | |
if 'chat_history' not in st.session_state: | |
st.session_state.chat_history = [] | |
# --- Main App Logic --- | |
# Role Selection | |
if st.session_state.role is None: | |
st.title("Welcome to the Classroom Chatbot! π") | |
st.write("Please select your role to begin.") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("I am a Faculty Member", use_container_width=True): | |
st.session_state.role = "Faculty" | |
st.rerun() | |
with col2: | |
if st.button("I am a Student", use_container_width=True): | |
st.session_state.role = "Student" | |
st.rerun() | |
# --- Faculty View --- | |
elif st.session_state.role == "Faculty": | |
st.title("π Faculty Dashboard") | |
st.header("Create a New Classroom") | |
# Input fields outside of form for better file handling | |
class_name = st.text_input("Classroom Name / Code", help="A unique name for your class, e.g., 'CS101-Fall24'") | |
api_key = st.text_input("Google Gemini API Key", type="password", help="Your API key will be stored securely for this classroom's use.") | |
uploaded_files = st.file_uploader( | |
"Upload Course Materials (PDFs only)", | |
type="pdf", | |
accept_multiple_files=True, | |
help="Select one or more PDF files containing course materials" | |
) | |
# Show uploaded files | |
if uploaded_files: | |
st.write("**Uploaded Files:**") | |
for file in uploaded_files: | |
st.write(f"- {file.name} ({file.size} bytes)") | |
# Create classroom button | |
if st.button("Create Classroom", type="primary"): | |
if not class_name or not api_key or not uploaded_files: | |
st.warning("Please fill out all fields and upload at least one document.") | |
elif os.path.exists(os.path.join(CLASSROOMS_DIR, class_name)): | |
st.error(f"A classroom with the name '{class_name}' already exists. Please choose a different name.") | |
else: | |
with st.spinner("Processing documents and setting up classroom..."): | |
context = extract_text_from_pdf(uploaded_files) | |
if context: | |
save_classroom_data(class_name, api_key, context) | |
st.success(f"Classroom '{class_name}' created successfully!") | |
st.info(f"Students can now join using the code: **{class_name}**") | |
else: | |
st.error("Could not extract any text from the uploaded PDFs. Please check the files and try again.") | |
if st.button("Go Back to Role Selection"): | |
for key in st.session_state.keys(): | |
del st.session_state[key] | |
st.rerun() | |
# --- Student View --- | |
elif st.session_state.role == "Student": | |
st.title("π§βπ Student Portal") | |
# Student joins a class | |
if st.session_state.class_name is None: | |
st.header("Join a Classroom") | |
class_name_input = st.text_input("Enter the Classroom Code provided by your faculty:") | |
if st.button("Join"): | |
class_dir = os.path.join(CLASSROOMS_DIR, class_name_input) | |
if os.path.isdir(class_dir): | |
st.session_state.class_name = class_name_input | |
st.success(f"Successfully joined classroom: {class_name_input}") | |
st.rerun() | |
else: | |
st.error("Invalid classroom code. Please check with your faculty.") | |
if st.button("Go Back to Role Selection"): | |
for key in st.session_state.keys(): | |
del st.session_state[key] | |
st.rerun() | |
# Chat interface for joined students | |
else: | |
st.header(f"Chatbot for: {st.session_state.class_name}") | |
st.markdown("Ask questions about the course materials provided by your faculty.") | |
# Load classroom data | |
class_dir = os.path.join(CLASSROOMS_DIR, st.session_state.class_name) | |
try: | |
with open(os.path.join(class_dir, "context.txt"), "r", encoding="utf-8") as f: | |
context = f.read() | |
with open(os.path.join(class_dir, "api_key.txt"), "r") as f: | |
api_key = f.read().strip() | |
except FileNotFoundError: | |
st.error("Classroom data is missing. Please contact your faculty.") | |
st.stop() | |
# Display chat history | |
for message in st.session_state.chat_history: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Chat input | |
if prompt := st.chat_input("What is your question?"): | |
# Add user message to chat history | |
st.session_state.chat_history.append({"role": "user", "content": prompt}) | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Get bot response | |
with st.chat_message("assistant"): | |
with st.spinner("Thinking..."): | |
response = get_gemini_response(api_key, context, st.session_state.chat_history, prompt) | |
st.markdown(response) | |
# Add bot response to chat history and log it | |
st.session_state.chat_history.append({"role": "assistant", "content": response}) | |
log_interaction(st.session_state.class_name, prompt, response) | |
if st.button("Leave Classroom"): | |
st.session_state.class_name = None | |
st.session_state.chat_history = [] | |
st.rerun() |