import streamlit as st import os import google.generativeai as genai from dotenv import load_dotenv import time from typing import Any, List, Optional import numpy as np from sklearn.metrics.pairwise import cosine_similarity import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense, Input from tensorflow.keras.utils import to_categorical from tensorflow.keras.optimizers import Adam # Load environment variables load_dotenv() GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") # Configure Generative AI model if GOOGLE_API_KEY: genai.configure(api_key=GOOGLE_API_KEY) else: st.error( "Google AI Studio API key not found. Please add it to your .env file. " "You can obtain an API key from https://makersuite.google.com/." ) st.stop() st.title("Embeddings and Vector Search Demo") st.subheader("Explore Embeddings and Vector Databases") # Sidebar for explanations with st.sidebar: st.header("Embeddings and Vector Search") st.markdown( """ This app demonstrates how embeddings and vector databases can be used for various tasks. """ ) st.subheader("Key Concepts:") st.markdown( """ - **Embeddings**: Numerical representations of text, capturing semantic meaning. - **Vector Databases**: Databases optimized for storing and querying vectors (simulated here). - **Retrieval Augmented Generation (RAG)**: Combining retrieval with LLM generation. - **Cosine Similarity**: A measure of similarity between two vectors. - **Neural Networks**: Using embeddings as input for classification. """ ) st.subheader("Whitepaper Insights") st.markdown( """ - Efficient similarity search using vector indexes (e.g., ANN). - Handling large datasets and scalability considerations. - Applications of embeddings: search, recommendation, classification, etc. """ ) # --- Helper Functions --- def code_block(text: str, language: str = "text") -> None: """Displays text as a formatted code block in Streamlit.""" st.markdown(f"```{language}\n{text}\n```", unsafe_allow_html=True) def display_response(response: Any) -> None: """Displays the model's response.""" if response and hasattr(response, "text"): st.subheader("Generated Response:") st.markdown(response.text) else: st.error("Failed to generate a response.") def generate_embeddings(texts: List[str], model_name: str = "models/embedding-001") -> Optional[List[List[float]]]: """Generates embeddings for a list of texts using a specified model. Args: texts: List of text strings. model_name: Name of the embedding model. Returns: List of embeddings (list of floats) or None on error. """ try: # Use the embedding model directly embeddings = [] for text in texts: result = genai.embed_content( model=model_name, content=text, task_type="retrieval_document" # or "retrieval_query" for queries ) embeddings.append(result['embedding']) return embeddings except Exception as e: st.error(f"Error generating embeddings with model '{model_name}': {e}") return None def generate_with_retry(prompt: str, model_name: str, generation_config: genai.types.GenerationConfig, max_retries: int = 3, delay: int = 5) -> Any: """Generates content with retry logic and error handling.""" for i in range(max_retries): try: model = genai.GenerativeModel(model_name) response = model.generate_content(prompt, generation_config=generation_config) return response except Exception as e: error_message = str(e) st.warning(f"Error during generation (attempt {i + 1}/{max_retries}): {error_message}") if "404" in error_message and "not found" in error_message: st.error( f"Model '{model_name}' is not available or not supported. Please select a different model." ) return None elif i < max_retries - 1: st.info(f"Retrying in {delay} seconds...") time.sleep(delay) else: st.error(f"Failed to generate content after {max_retries} attempts. Please check your prompt and model.") return None return None def calculate_similarity(embedding1: List[float], embedding2: List[float]) -> float: """Calculates the cosine similarity between two embeddings.""" return cosine_similarity(np.array(embedding1).reshape(1, -1), np.array(embedding2).reshape(1, -1))[0][0] def create_and_train_model( embeddings: List[List[float]], labels: List[int], num_classes: int, epochs: int, batch_size: int, learning_rate: float, optimizer_str: str ) -> tf.keras.Model: """Creates and trains a neural network for classification.""" model = Sequential([ Input(shape=(len(embeddings[0]),), # Fixed the double comma here Dense(64, activation='relu'), Dense(32, activation='relu'), Dense(num_classes, activation='softmax') ]) if optimizer_str.lower() == 'adam': optimizer = Adam(learning_rate=learning_rate) elif optimizer_str.lower() == 'sgd': optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate) elif optimizer_str.lower() == 'rmsprop': optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate) else: optimizer = Adam(learning_rate=learning_rate) model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) encoded_labels = to_categorical(labels, num_classes=num_classes) model.fit(np.array(embeddings), encoded_labels, epochs=epochs, batch_size=batch_size, verbose=0) return model # --- RAG Question Answering --- st.header("RAG Question Answering") rag_model_name = st.selectbox("Select model for RAG:", ["gemini-pro"], index=0) rag_embedding_model = st.selectbox("Select embedding model for RAG:", ["models/embedding-001"], index=0) rag_context = st.text_area( "Enter your context documents:", "Relevant information to answer the question. Separate documents with newlines.", height=150, ) rag_question = st.text_area("Ask a question about the context:", "What is the main topic?", height=70) rag_max_context_length = st.number_input("Maximum Context Length", min_value=100, max_value=2000, value=500, step=100) if st.button("Answer with RAG"): if not rag_context or not rag_question: st.warning("Please provide both context and a question.") else: with st.spinner("Generating answer..."): try: # 1. Generate embeddings for the context context_embeddings = generate_embeddings(rag_context.split('\n'), rag_embedding_model) if not context_embeddings: st.stop() # 2. Generate embedding for the question question_embedding = generate_embeddings([rag_question], rag_embedding_model) if not question_embedding: st.stop() # 3. Calculate similarity scores similarities = cosine_similarity(np.array(question_embedding).reshape(1, -1), np.array(context_embeddings))[0] # 4. Find the most relevant document(s) most_relevant_index = np.argmax(similarities) relevant_context = rag_context.split('\n')[most_relevant_index] if len(relevant_context) > rag_max_context_length: relevant_context = relevant_context[:rag_max_context_length] # 5. Construct the prompt rag_prompt = f"Use the following context to answer the question: '{rag_question}'.\nContext: {relevant_context}" # 6. Generate the answer response = generate_with_retry(rag_prompt, rag_model_name, generation_config=genai.types.GenerationConfig()) if response: display_response(response) except Exception as e: st.error(f"An error occurred: {e}") # --- Text Similarity --- st.header("Text Similarity") similarity_embedding_model = st.selectbox("Select embedding model for similarity:", ["models/embedding-001"], index=0) text1 = st.text_area("Enter text 1:", "This is the first sentence.", height=70) text2 = st.text_area("Enter text 2:", "This is a similar sentence.", height=70) if st.button("Calculate Similarity"): if not text1 or not text2: st.warning("Please provide both texts.") else: with st.spinner("Calculating similarity..."): try: embeddings = generate_embeddings([text1, text2], similarity_embedding_model) if not embeddings: st.stop() similarity = calculate_similarity(embeddings[0], embeddings[1]) st.subheader("Cosine Similarity:") st.write(similarity) except Exception as e: st.error(f"An error occurred: {e}") # --- Neural Classification --- st.header("Neural Classification with Embeddings") classification_embedding_model = st.selectbox("Select embedding model for classification:", ["models/embedding-001"], index=0) classification_data = st.text_area( "Enter your training data (text, label pairs), separated by newlines. Example: text1,0\\ntext2,1", "text1,0\ntext2,1\ntext3,0\ntext4,1", height=150, ) classification_prompt = st.text_area("Enter text to classify:", "This is a test text.", height=70) num_epochs = st.number_input("Number of Epochs", min_value=1, max_value=200, value=10, step=1) batch_size = st.number_input("Batch Size", min_value=1, max_value=128, value=32, step=1) learning_rate = st.number_input("Learning Rate", min_value=0.0001, max_value=0.1, value=0.0001, step=0.0001, format="%.4f") optimizer_str = st.selectbox("Optimizer", ['adam', 'sgd', 'rmsprop'], index=0) def process_classification_data(data: str) -> Optional[tuple[List[str], List[int]]]: """Processes the classification data string into lists of texts and labels.""" data_pairs = [line.split(',') for line in data.split('\n') if ',' in line] if not data_pairs: st.error("No valid data pairs found. Please ensure each line contains 'text,label'.") return None texts = [] labels = [] for i, pair in enumerate(data_pairs): if len(pair) != 2: st.error(f"Invalid data format in line {i + 1}: '{','.join(pair)}'. Expected 'text,label'.") return None text = pair[0].strip() label_str = pair[1].strip() try: label = int(label_str) texts.append(text) labels.append(label) except ValueError: st.error(f"Invalid label value in line {i + 1}: '{label_str}'. Label must be an integer.") return None return texts, labels if st.button("Classify"): if not classification_data or not classification_prompt: st.warning("Please provide training data and text to classify.") else: with st.spinner("Classifying..."): try: processed_data = process_classification_data(classification_data) if not processed_data: st.stop() train_texts, train_labels = processed_data num_classes = len(set(train_labels)) train_embeddings = generate_embeddings(train_texts, classification_embedding_model) if not train_embeddings: st.stop() model = create_and_train_model( train_embeddings, train_labels, num_classes, num_epochs, batch_size, learning_rate, optimizer_str ) predict_embedding = generate_embeddings([classification_prompt], classification_embedding_model) if not predict_embedding: st.stop() prediction = model.predict(np.array([predict_embedding]), verbose=0) predicted_class = np.argmax(prediction[0]) st.subheader("Predicted Class:") st.write(predicted_class) st.subheader("Prediction Probabilities:") st.write(prediction) except Exception as e: st.error(f"An error occurred: {e}")