Dua Rajper
Update app.py
4a5482a verified
import streamlit as st
import os
import google.generativeai as genai
from dotenv import load_dotenv
import time
from typing import Any, List, Optional
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
# Load environment variables
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
# Configure Generative AI model
if GOOGLE_API_KEY:
genai.configure(api_key=GOOGLE_API_KEY)
else:
st.error(
"Google AI Studio API key not found. Please add it to your .env file. "
"You can obtain an API key from https://makersuite.google.com/."
)
st.stop()
st.title("Embeddings and Vector Search Demo")
st.subheader("Explore Embeddings and Vector Databases")
# Sidebar for explanations
with st.sidebar:
st.header("Embeddings and Vector Search")
st.markdown(
"""
This app demonstrates how embeddings and vector databases can be used for various tasks.
"""
)
st.subheader("Key Concepts:")
st.markdown(
"""
- **Embeddings**: Numerical representations of text, capturing semantic meaning.
- **Vector Databases**: Databases optimized for storing and querying vectors (simulated here).
- **Retrieval Augmented Generation (RAG)**: Combining retrieval with LLM generation.
- **Cosine Similarity**: A measure of similarity between two vectors.
- **Neural Networks**: Using embeddings as input for classification.
"""
)
st.subheader("Whitepaper Insights")
st.markdown(
"""
- Efficient similarity search using vector indexes (e.g., ANN).
- Handling large datasets and scalability considerations.
- Applications of embeddings: search, recommendation, classification, etc.
"""
)
# --- Helper Functions ---
def code_block(text: str, language: str = "text") -> None:
"""Displays text as a formatted code block in Streamlit."""
st.markdown(f"```{language}\n{text}\n```", unsafe_allow_html=True)
def display_response(response: Any) -> None:
"""Displays the model's response."""
if response and hasattr(response, "text"):
st.subheader("Generated Response:")
st.markdown(response.text)
else:
st.error("Failed to generate a response.")
def generate_embeddings(texts: List[str], model_name: str = "models/embedding-001") -> Optional[List[List[float]]]:
"""Generates embeddings for a list of texts using a specified model.
Args:
texts: List of text strings.
model_name: Name of the embedding model.
Returns:
List of embeddings (list of floats) or None on error.
"""
try:
# Use the embedding model directly
embeddings = []
for text in texts:
result = genai.embed_content(
model=model_name,
content=text,
task_type="retrieval_document" # or "retrieval_query" for queries
)
embeddings.append(result['embedding'])
return embeddings
except Exception as e:
st.error(f"Error generating embeddings with model '{model_name}': {e}")
return None
def generate_with_retry(prompt: str, model_name: str, generation_config: genai.types.GenerationConfig, max_retries: int = 3, delay: int = 5) -> Any:
"""Generates content with retry logic and error handling."""
for i in range(max_retries):
try:
model = genai.GenerativeModel(model_name)
response = model.generate_content(prompt, generation_config=generation_config)
return response
except Exception as e:
error_message = str(e)
st.warning(f"Error during generation (attempt {i + 1}/{max_retries}): {error_message}")
if "404" in error_message and "not found" in error_message:
st.error(
f"Model '{model_name}' is not available or not supported. Please select a different model."
)
return None
elif i < max_retries - 1:
st.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
st.error(f"Failed to generate content after {max_retries} attempts. Please check your prompt and model.")
return None
return None
def calculate_similarity(embedding1: List[float], embedding2: List[float]) -> float:
"""Calculates the cosine similarity between two embeddings."""
return cosine_similarity(np.array(embedding1).reshape(1, -1), np.array(embedding2).reshape(1, -1))[0][0]
def create_and_train_model(
embeddings: List[List[float]],
labels: List[int],
num_classes: int,
epochs: int,
batch_size: int,
learning_rate: float,
optimizer_str: str
) -> tf.keras.Model:
"""Creates and trains a neural network for classification."""
model = Sequential([
Input(shape=(len(embeddings[0]),), # Fixed the double comma here
Dense(64, activation='relu'),
Dense(32, activation='relu'),
Dense(num_classes, activation='softmax')
])
if optimizer_str.lower() == 'adam':
optimizer = Adam(learning_rate=learning_rate)
elif optimizer_str.lower() == 'sgd':
optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)
elif optimizer_str.lower() == 'rmsprop':
optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)
else:
optimizer = Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
encoded_labels = to_categorical(labels, num_classes=num_classes)
model.fit(np.array(embeddings), encoded_labels, epochs=epochs, batch_size=batch_size, verbose=0)
return model
# --- RAG Question Answering ---
st.header("RAG Question Answering")
rag_model_name = st.selectbox("Select model for RAG:", ["gemini-pro"], index=0)
rag_embedding_model = st.selectbox("Select embedding model for RAG:", ["models/embedding-001"], index=0)
rag_context = st.text_area(
"Enter your context documents:",
"Relevant information to answer the question. Separate documents with newlines.",
height=150,
)
rag_question = st.text_area("Ask a question about the context:", "What is the main topic?", height=70)
rag_max_context_length = st.number_input("Maximum Context Length", min_value=100, max_value=2000, value=500, step=100)
if st.button("Answer with RAG"):
if not rag_context or not rag_question:
st.warning("Please provide both context and a question.")
else:
with st.spinner("Generating answer..."):
try:
# 1. Generate embeddings for the context
context_embeddings = generate_embeddings(rag_context.split('\n'), rag_embedding_model)
if not context_embeddings:
st.stop()
# 2. Generate embedding for the question
question_embedding = generate_embeddings([rag_question], rag_embedding_model)
if not question_embedding:
st.stop()
# 3. Calculate similarity scores
similarities = cosine_similarity(np.array(question_embedding).reshape(1, -1), np.array(context_embeddings))[0]
# 4. Find the most relevant document(s)
most_relevant_index = np.argmax(similarities)
relevant_context = rag_context.split('\n')[most_relevant_index]
if len(relevant_context) > rag_max_context_length:
relevant_context = relevant_context[:rag_max_context_length]
# 5. Construct the prompt
rag_prompt = f"Use the following context to answer the question: '{rag_question}'.\nContext: {relevant_context}"
# 6. Generate the answer
response = generate_with_retry(rag_prompt, rag_model_name, generation_config=genai.types.GenerationConfig())
if response:
display_response(response)
except Exception as e:
st.error(f"An error occurred: {e}")
# --- Text Similarity ---
st.header("Text Similarity")
similarity_embedding_model = st.selectbox("Select embedding model for similarity:", ["models/embedding-001"], index=0)
text1 = st.text_area("Enter text 1:", "This is the first sentence.", height=70)
text2 = st.text_area("Enter text 2:", "This is a similar sentence.", height=70)
if st.button("Calculate Similarity"):
if not text1 or not text2:
st.warning("Please provide both texts.")
else:
with st.spinner("Calculating similarity..."):
try:
embeddings = generate_embeddings([text1, text2], similarity_embedding_model)
if not embeddings:
st.stop()
similarity = calculate_similarity(embeddings[0], embeddings[1])
st.subheader("Cosine Similarity:")
st.write(similarity)
except Exception as e:
st.error(f"An error occurred: {e}")
# --- Neural Classification ---
st.header("Neural Classification with Embeddings")
classification_embedding_model = st.selectbox("Select embedding model for classification:", ["models/embedding-001"], index=0)
classification_data = st.text_area(
"Enter your training data (text, label pairs), separated by newlines. Example: text1,0\\ntext2,1",
"text1,0\ntext2,1\ntext3,0\ntext4,1",
height=150,
)
classification_prompt = st.text_area("Enter text to classify:", "This is a test text.", height=70)
num_epochs = st.number_input("Number of Epochs", min_value=1, max_value=200, value=10, step=1)
batch_size = st.number_input("Batch Size", min_value=1, max_value=128, value=32, step=1)
learning_rate = st.number_input("Learning Rate", min_value=0.0001, max_value=0.1, value=0.0001, step=0.0001, format="%.4f")
optimizer_str = st.selectbox("Optimizer", ['adam', 'sgd', 'rmsprop'], index=0)
def process_classification_data(data: str) -> Optional[tuple[List[str], List[int]]]:
"""Processes the classification data string into lists of texts and labels."""
data_pairs = [line.split(',') for line in data.split('\n') if ',' in line]
if not data_pairs:
st.error("No valid data pairs found. Please ensure each line contains 'text,label'.")
return None
texts = []
labels = []
for i, pair in enumerate(data_pairs):
if len(pair) != 2:
st.error(f"Invalid data format in line {i + 1}: '{','.join(pair)}'. Expected 'text,label'.")
return None
text = pair[0].strip()
label_str = pair[1].strip()
try:
label = int(label_str)
texts.append(text)
labels.append(label)
except ValueError:
st.error(f"Invalid label value in line {i + 1}: '{label_str}'. Label must be an integer.")
return None
return texts, labels
if st.button("Classify"):
if not classification_data or not classification_prompt:
st.warning("Please provide training data and text to classify.")
else:
with st.spinner("Classifying..."):
try:
processed_data = process_classification_data(classification_data)
if not processed_data:
st.stop()
train_texts, train_labels = processed_data
num_classes = len(set(train_labels))
train_embeddings = generate_embeddings(train_texts, classification_embedding_model)
if not train_embeddings:
st.stop()
model = create_and_train_model(
train_embeddings, train_labels, num_classes, num_epochs, batch_size, learning_rate, optimizer_str
)
predict_embedding = generate_embeddings([classification_prompt], classification_embedding_model)
if not predict_embedding:
st.stop()
prediction = model.predict(np.array([predict_embedding]), verbose=0)
predicted_class = np.argmax(prediction[0])
st.subheader("Predicted Class:")
st.write(predicted_class)
st.subheader("Prediction Probabilities:")
st.write(prediction)
except Exception as e:
st.error(f"An error occurred: {e}")