import os import uuid from pathlib import Path from pinecone.grpc import PineconeGRPC as Pinecone from pinecone import ServerlessSpec from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings from huggingface_hub import InferenceClient from typing import List from datetime import datetime from sentence_transformers import SentenceTransformer from langchain.embeddings.base import Embeddings from langchain_community.embeddings import HuggingFaceEmbeddings import json current_dir = Path(__file__).resolve().parent class SentenceTransfmEmbeddings(Embeddings): """Sentence Transformers embedding class""" def __init__(self, model_name: str = "sentence-transformers/all-mpnet-base-v2"): self.model = SentenceTransformer(model_name) def embed_documents(self, texts: List[str]) -> List[List[float]]: """Embed a list of documents""" try: embeddings = self.model.encode(texts) return embeddings.tolist() except Exception as e: print(f"Error embedding documents: {e}") # Return dummy embeddings to prevent crash return [[0.0] * 768 for _ in texts] def embed_query(self, text: str) -> List[float]: """Embed a single query""" try: embedding = self.model.encode([text]) return embedding[0].tolist() except Exception as e: print(f"Error embedding query: {e}") return [0.0] * 768 class DataIndexer: source_file = os.path.join(current_dir, 'sources.txt') def __init__(self, index_name='langchain-repo') -> None: # TODO: choose your embedding model self.embedding_client = InferenceClient( # "dunzhang/stella_en_1.5B_v5", "sentence-transformers/all-mpnet-base-v2", token=os.environ['HF_TOKEN'], ) self.embeddings = SentenceTransfmEmbeddings( "sentence-transformers/all-mpnet-base-v2" ) # self.embeddings = HuggingFaceEmbeddings( # model_name="sentence-transformers/all-mpnet-base-v2" # ) self.spec = ServerlessSpec( cloud = 'aws', region='us-east-1' ) # self.embedding_client = OpenAIEmbeddings() self.index_name = index_name self.pinecone_client = Pinecone(api_key=os.environ.get('PINECONE_API_KEY')) if index_name not in self.pinecone_client.list_indexes().names(): # TODO: create your index if it doesn't exist. Use the create_index function. # Make sure to choose the dimension that corresponds to your embedding model self.pinecone_client.create_index( name=index_name, dimension=768, metric='cosine', spec=self.spec ) self.index = self.pinecone_client.Index(self.index_name) # TODO: make sure to build the index. # with open(self.source_file, 'r') as file: # sources = file.readlines() # sources = [s.strip() for s in sources if s.strip()] # if not sources: # self.source_index = None # else: # self.source_index = self.get_source_index() self.source_index=None def get_source_index(self): if not os.path.isfile(self.source_file): print('No source file') return None print('create source index') with open(self.source_file, 'r') as file: sources = file.readlines() sources = [s.strip() for s in sources if s.strip()] if not sources: print("No valid sources to index") return None print("sources are:", sources) ## testing embeddings = self.embeddings.embed_documents(sources) print(f"Generated {len(embeddings)} embeddings for {len(sources)} sources") ## testing vectorstore = Chroma.from_texts( sources, embedding=self.embeddings ) return vectorstore def index_data(self, docs, batch_size=32): with open(self.source_file, 'a') as file: for doc in docs: file.writelines(doc.metadata['source'] + '\n') self.source_index = self.get_source_index() for i in range(0, len(docs), batch_size): batch = docs[i: i + batch_size] # TODO: create a list of the vector representations of each text data in the batch # TODO: choose your embedding model # values = self.embedding_client.embed_documents([ # doc.page_content for doc in batch # ]) values = self.embedding_client.feature_extraction([ doc.page_content for doc in batch ]) # values = None # TODO: create a list of unique identifiers for each element in the batch with the uuid package. vector_ids = [str(uuid.uuid4()) for _ in batch] # TODO: create a list of dictionaries representing the metadata. Capture the text data # with the "text" key, and make sure to capture the rest of the doc.metadata. metadatas = [{"text": doc.page_content, **(doc.metadata if doc.metadata else {}) } for doc in batch] # create a list of dictionaries with keys "id" (the unique identifiers), "values" # (the vector representation), and "metadata" (the metadata). vectors = [{ 'id': vector_id, 'values': value, 'metadata': metadata } for vector_id, value, metadata in zip(vector_ids, values, metadatas)] for v in vectors[:5]: print("Metadata:", v['metadata']) try: # TODO: Use the function upsert to upload the data to the database. upsert_response = self.index.upsert(vectors) print(f"successfully indexed batch {upsert_response}") except Exception as e: print(e) def search(self, text_query, top_k=5, hybrid_search=False): filter = None if hybrid_search and self.source_index: # I implemented the filtering process to pull the 50 most relevant file names # to the question. Make sure to adjust this number as you see fit. source_docs = self.source_index.similarity_search(text_query, 50) filter = {"source": {"$in":[doc.page_content for doc in source_docs]}} result="" # TODO: embed the text_query by using the embedding model # TODO: choose your embedding model # vector = self.embedding_client.feature_extraction(text_query) try: print("text") print(text_query) vector = self.embedding_client.feature_extraction( text = text_query, ) if vector is None: print("failed to embed the text query in vector search query for pinecone") return [] else: print("debug1_result") result = self.index.query(vector, filter=filter, top_k=top_k, include_values=True, include_metadata=True ) print(f"debugged_result query successful without error for the question:{text_query}") docs = [] # print(f" none type in result? {result}") for res in result["matches"]: # TODO: From the result's metadata, extract the "text" element. print("results filename:",res['metadata']['file_name']) print("result score:",res['score']) if res['score']>0.540: docs.append(res['metadata']['text']) # pass # print("docs: ",docs[0]) return docs except Exception as e: print(f"error in search:{e}") return [] # TODO: use the vector representation of the text_query to # search the database by using the query function. if __name__ == '__main__': from langchain_community.document_loaders import GitLoader from langchain_text_splitters import ( Language, RecursiveCharacterTextSplitter, ) print("start:", datetime.now()) loader = GitLoader( clone_url="https://github.com/langchain-ai/langchain", repo_path="./code_data/langchain_repo/", branch="master", ) python_splitter = RecursiveCharacterTextSplitter.from_language( language=Language.PYTHON, chunk_size=10000, chunk_overlap=100 ) docs = loader.load() docs = [doc for doc in docs if doc.metadata['file_type'] in ['.py', '.md']] docs = [doc for doc in docs if len(doc.page_content) < 50000] docs = python_splitter.split_documents(docs) for doc in docs: doc.page_content = '# {}\n\n'.format(doc.metadata['source']) + doc.page_content print("before instacing the indexer:", datetime.now()) indexer = DataIndexer() print("after instacing the indexer:", datetime.now()) with open('./app/sources.txt', 'a') as file: for doc in docs: file.writelines(doc.metadata['source'] + '\n') print("after writing the indexer:", datetime.now()) indexer.index_data(docs) print("end:", datetime.now()) # ###### test ########### # test_docs = docs[:2] # Just try first two documents # print("\nTest Document Details:") # print(f"Number of test documents: {len(test_docs)}") # for idx, doc in enumerate(test_docs): # print(f"\nDocument {idx + 1}:") # print(f"Content length: {len(doc.page_content)}") # # print(f"First 100 chars: {doc.page_content[:100]}") # print(f"Metadata: {doc.metadata}") # # try: # print("\nInitializing DataIndexer...") # indexer = DataIndexer() # print("\nStarting indexing...") # indexer.index_data(test_docs) # print("Test indexing successful") # # except Exception as e: # # print(f"Test indexing failed: {str(e)}")