Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import google.generativeai as genai | |
| from dotenv import load_dotenv | |
| import time | |
| from typing import Any, List, Optional | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import tensorflow as tf | |
| from tensorflow.keras.models import Sequential | |
| from tensorflow.keras.layers import Dense, Input | |
| from tensorflow.keras.utils import to_categorical | |
| from tensorflow.keras.optimizers import Adam | |
| # Load environment variables | |
| load_dotenv() | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| # Configure Generative AI model | |
| if GOOGLE_API_KEY: | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| else: | |
| st.error( | |
| "Google AI Studio API key not found. Please add it to your .env file. " | |
| "You can obtain an API key from https://makersuite.google.com/." | |
| ) | |
| st.stop() | |
| st.title("Embeddings and Vector Search Demo") | |
| st.subheader("Explore Embeddings and Vector Databases") | |
| # Sidebar for explanations | |
| with st.sidebar: | |
| st.header("Embeddings and Vector Search") | |
| st.markdown( | |
| """ | |
| This app demonstrates how embeddings and vector databases can be used for various tasks. | |
| """ | |
| ) | |
| st.subheader("Key Concepts:") | |
| st.markdown( | |
| """ | |
| - **Embeddings**: Numerical representations of text, capturing semantic meaning. | |
| - **Vector Databases**: Databases optimized for storing and querying vectors (simulated here). | |
| - **Retrieval Augmented Generation (RAG)**: Combining retrieval with LLM generation. | |
| - **Cosine Similarity**: A measure of similarity between two vectors. | |
| - **Neural Networks**: Using embeddings as input for classification. | |
| """ | |
| ) | |
| st.subheader("Whitepaper Insights") | |
| st.markdown( | |
| """ | |
| - Efficient similarity search using vector indexes (e.g., ANN). | |
| - Handling large datasets and scalability considerations. | |
| - Applications of embeddings: search, recommendation, classification, etc. | |
| """ | |
| ) | |
| # --- Helper Functions --- | |
| def code_block(text: str, language: str = "text") -> None: | |
| """Displays text as a formatted code block in Streamlit.""" | |
| st.markdown(f"```{language}\n{text}\n```", unsafe_allow_html=True) | |
| def display_response(response: Any) -> None: | |
| """Displays the model's response.""" | |
| if response and hasattr(response, "text"): | |
| st.subheader("Generated Response:") | |
| st.markdown(response.text) | |
| else: | |
| st.error("Failed to generate a response.") | |
| def generate_embeddings(texts: List[str], model_name: str = "models/embedding-001") -> Optional[List[List[float]]]: | |
| """Generates embeddings for a list of texts using a specified model. | |
| Args: | |
| texts: List of text strings. | |
| model_name: Name of the embedding model. | |
| Returns: | |
| List of embeddings (list of floats) or None on error. | |
| """ | |
| try: | |
| # Use the embedding model directly | |
| embeddings = [] | |
| for text in texts: | |
| result = genai.embed_content( | |
| model=model_name, | |
| content=text, | |
| task_type="retrieval_document" # or "retrieval_query" for queries | |
| ) | |
| embeddings.append(result['embedding']) | |
| return embeddings | |
| except Exception as e: | |
| st.error(f"Error generating embeddings with model '{model_name}': {e}") | |
| return None | |
| def generate_with_retry(prompt: str, model_name: str, generation_config: genai.types.GenerationConfig, max_retries: int = 3, delay: int = 5) -> Any: | |
| """Generates content with retry logic and error handling.""" | |
| for i in range(max_retries): | |
| try: | |
| model = genai.GenerativeModel(model_name) | |
| response = model.generate_content(prompt, generation_config=generation_config) | |
| return response | |
| except Exception as e: | |
| error_message = str(e) | |
| st.warning(f"Error during generation (attempt {i + 1}/{max_retries}): {error_message}") | |
| if "404" in error_message and "not found" in error_message: | |
| st.error( | |
| f"Model '{model_name}' is not available or not supported. Please select a different model." | |
| ) | |
| return None | |
| elif i < max_retries - 1: | |
| st.info(f"Retrying in {delay} seconds...") | |
| time.sleep(delay) | |
| else: | |
| st.error(f"Failed to generate content after {max_retries} attempts. Please check your prompt and model.") | |
| return None | |
| return None | |
| def calculate_similarity(embedding1: List[float], embedding2: List[float]) -> float: | |
| """Calculates the cosine similarity between two embeddings.""" | |
| return cosine_similarity(np.array(embedding1).reshape(1, -1), np.array(embedding2).reshape(1, -1))[0][0] | |
| def create_and_train_model( | |
| embeddings: List[List[float]], | |
| labels: List[int], | |
| num_classes: int, | |
| epochs: int, | |
| batch_size: int, | |
| learning_rate: float, | |
| optimizer_str: str | |
| ) -> tf.keras.Model: | |
| """Creates and trains a neural network for classification.""" | |
| model = Sequential([ | |
| Input(shape=(len(embeddings[0]),), # Fixed the double comma here | |
| Dense(64, activation='relu'), | |
| Dense(32, activation='relu'), | |
| Dense(num_classes, activation='softmax') | |
| ]) | |
| if optimizer_str.lower() == 'adam': | |
| optimizer = Adam(learning_rate=learning_rate) | |
| elif optimizer_str.lower() == 'sgd': | |
| optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate) | |
| elif optimizer_str.lower() == 'rmsprop': | |
| optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate) | |
| else: | |
| optimizer = Adam(learning_rate=learning_rate) | |
| model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) | |
| encoded_labels = to_categorical(labels, num_classes=num_classes) | |
| model.fit(np.array(embeddings), encoded_labels, epochs=epochs, batch_size=batch_size, verbose=0) | |
| return model | |
| # --- RAG Question Answering --- | |
| st.header("RAG Question Answering") | |
| rag_model_name = st.selectbox("Select model for RAG:", ["gemini-pro"], index=0) | |
| rag_embedding_model = st.selectbox("Select embedding model for RAG:", ["models/embedding-001"], index=0) | |
| rag_context = st.text_area( | |
| "Enter your context documents:", | |
| "Relevant information to answer the question. Separate documents with newlines.", | |
| height=150, | |
| ) | |
| rag_question = st.text_area("Ask a question about the context:", "What is the main topic?", height=70) | |
| rag_max_context_length = st.number_input("Maximum Context Length", min_value=100, max_value=2000, value=500, step=100) | |
| if st.button("Answer with RAG"): | |
| if not rag_context or not rag_question: | |
| st.warning("Please provide both context and a question.") | |
| else: | |
| with st.spinner("Generating answer..."): | |
| try: | |
| # 1. Generate embeddings for the context | |
| context_embeddings = generate_embeddings(rag_context.split('\n'), rag_embedding_model) | |
| if not context_embeddings: | |
| st.stop() | |
| # 2. Generate embedding for the question | |
| question_embedding = generate_embeddings([rag_question], rag_embedding_model) | |
| if not question_embedding: | |
| st.stop() | |
| # 3. Calculate similarity scores | |
| similarities = cosine_similarity(np.array(question_embedding).reshape(1, -1), np.array(context_embeddings))[0] | |
| # 4. Find the most relevant document(s) | |
| most_relevant_index = np.argmax(similarities) | |
| relevant_context = rag_context.split('\n')[most_relevant_index] | |
| if len(relevant_context) > rag_max_context_length: | |
| relevant_context = relevant_context[:rag_max_context_length] | |
| # 5. Construct the prompt | |
| rag_prompt = f"Use the following context to answer the question: '{rag_question}'.\nContext: {relevant_context}" | |
| # 6. Generate the answer | |
| response = generate_with_retry(rag_prompt, rag_model_name, generation_config=genai.types.GenerationConfig()) | |
| if response: | |
| display_response(response) | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| # --- Text Similarity --- | |
| st.header("Text Similarity") | |
| similarity_embedding_model = st.selectbox("Select embedding model for similarity:", ["models/embedding-001"], index=0) | |
| text1 = st.text_area("Enter text 1:", "This is the first sentence.", height=70) | |
| text2 = st.text_area("Enter text 2:", "This is a similar sentence.", height=70) | |
| if st.button("Calculate Similarity"): | |
| if not text1 or not text2: | |
| st.warning("Please provide both texts.") | |
| else: | |
| with st.spinner("Calculating similarity..."): | |
| try: | |
| embeddings = generate_embeddings([text1, text2], similarity_embedding_model) | |
| if not embeddings: | |
| st.stop() | |
| similarity = calculate_similarity(embeddings[0], embeddings[1]) | |
| st.subheader("Cosine Similarity:") | |
| st.write(similarity) | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| # --- Neural Classification --- | |
| st.header("Neural Classification with Embeddings") | |
| classification_embedding_model = st.selectbox("Select embedding model for classification:", ["models/embedding-001"], index=0) | |
| classification_data = st.text_area( | |
| "Enter your training data (text, label pairs), separated by newlines. Example: text1,0\\ntext2,1", | |
| "text1,0\ntext2,1\ntext3,0\ntext4,1", | |
| height=150, | |
| ) | |
| classification_prompt = st.text_area("Enter text to classify:", "This is a test text.", height=70) | |
| num_epochs = st.number_input("Number of Epochs", min_value=1, max_value=200, value=10, step=1) | |
| batch_size = st.number_input("Batch Size", min_value=1, max_value=128, value=32, step=1) | |
| learning_rate = st.number_input("Learning Rate", min_value=0.0001, max_value=0.1, value=0.0001, step=0.0001, format="%.4f") | |
| optimizer_str = st.selectbox("Optimizer", ['adam', 'sgd', 'rmsprop'], index=0) | |
| def process_classification_data(data: str) -> Optional[tuple[List[str], List[int]]]: | |
| """Processes the classification data string into lists of texts and labels.""" | |
| data_pairs = [line.split(',') for line in data.split('\n') if ',' in line] | |
| if not data_pairs: | |
| st.error("No valid data pairs found. Please ensure each line contains 'text,label'.") | |
| return None | |
| texts = [] | |
| labels = [] | |
| for i, pair in enumerate(data_pairs): | |
| if len(pair) != 2: | |
| st.error(f"Invalid data format in line {i + 1}: '{','.join(pair)}'. Expected 'text,label'.") | |
| return None | |
| text = pair[0].strip() | |
| label_str = pair[1].strip() | |
| try: | |
| label = int(label_str) | |
| texts.append(text) | |
| labels.append(label) | |
| except ValueError: | |
| st.error(f"Invalid label value in line {i + 1}: '{label_str}'. Label must be an integer.") | |
| return None | |
| return texts, labels | |
| if st.button("Classify"): | |
| if not classification_data or not classification_prompt: | |
| st.warning("Please provide training data and text to classify.") | |
| else: | |
| with st.spinner("Classifying..."): | |
| try: | |
| processed_data = process_classification_data(classification_data) | |
| if not processed_data: | |
| st.stop() | |
| train_texts, train_labels = processed_data | |
| num_classes = len(set(train_labels)) | |
| train_embeddings = generate_embeddings(train_texts, classification_embedding_model) | |
| if not train_embeddings: | |
| st.stop() | |
| model = create_and_train_model( | |
| train_embeddings, train_labels, num_classes, num_epochs, batch_size, learning_rate, optimizer_str | |
| ) | |
| predict_embedding = generate_embeddings([classification_prompt], classification_embedding_model) | |
| if not predict_embedding: | |
| st.stop() | |
| prediction = model.predict(np.array([predict_embedding]), verbose=0) | |
| predicted_class = np.argmax(prediction[0]) | |
| st.subheader("Predicted Class:") | |
| st.write(predicted_class) | |
| st.subheader("Prediction Probabilities:") | |
| st.write(prediction) | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") |