|
"""Retrieval functions for embeddings and search.""" |
|
|
|
from typing import Any, List |
|
import numpy as np |
|
from scipy import spatial |
|
import pandas as pd |
|
|
|
|
|
def create_embeddings_batch(client, |
|
deployment_name: str, |
|
batch_size: int, |
|
df: pd.DataFrame, |
|
chunk_column: str, |
|
cost_per_thousand_tokens: float = 0.000125 |
|
) -> tuple[pd.DataFrame, Any]: |
|
""" |
|
Creates a batch of embeddings from the provided text data in the dataframe and computes the |
|
total cost for the embedding operation. |
|
|
|
This function processes text data within a specified chunk column of the dataframe, generates |
|
embeddings in batches, appends the embeddings to the dataframe, and calculates the cost |
|
incurred based on the number of tokens processed and the cost per thousand tokens. |
|
|
|
:param client: The client object used for generating embeddings. |
|
:param deployment_name: The name of the deployed model to create embeddings. |
|
:param batch_size: The number of rows to process in each batch. |
|
:param df: The pandas DataFrame containing the data with text for embedding. |
|
:param chunk_column: The column in the dataframe containing the text data to embedd. |
|
:param cost_per_thousand_tokens: The cost per thousand tokens used in calculating the total cost. |
|
Defaults to 0.000125. |
|
:return: A tuple consisting of the updated dataframe with embeddings and the computed total cost. |
|
:raises ValueError: If the dataframe (`df`) is not a pandas DataFrame, the `chunk_column` is not |
|
present in the dataframe, or `batch_size` is not positive. |
|
:raises RuntimeError: If an exception occurs during the embedding creation process. |
|
""" |
|
|
|
if not isinstance(df, pd.DataFrame): |
|
raise ValueError("Input df must be a pandas DataFrame") |
|
|
|
if chunk_column not in df.columns: |
|
raise ValueError(f"Column {chunk_column} not found in dataframe") |
|
|
|
if batch_size < 1: |
|
raise ValueError("batch_size must be positive") |
|
|
|
|
|
embeddings = [] |
|
total_tokens = 0 |
|
try: |
|
|
|
for i in range(0, len(df), batch_size): |
|
|
|
batch_texts = df.iloc[i:i + batch_size][chunk_column].tolist() |
|
|
|
|
|
for text in batch_texts: |
|
|
|
if not text or not isinstance(text, str): |
|
embeddings.append([]) |
|
continue |
|
|
|
emb_response = client.embeddings.create( |
|
model=deployment_name, |
|
input=text.replace("\n", " ") |
|
) |
|
|
|
|
|
if (emb_response and hasattr(emb_response, 'data') and |
|
emb_response.data and len(emb_response.data) > 0 and |
|
hasattr(emb_response.data[0], 'embedding')): |
|
|
|
embedding = emb_response.data[0].embedding |
|
embeddings.append(embedding) |
|
|
|
|
|
if hasattr(emb_response, 'usage') and hasattr(emb_response.usage, 'prompt_tokens'): |
|
total_tokens += emb_response.usage.prompt_tokens |
|
else: |
|
|
|
print(f"Warning: Invalid embedding response for text: {text[:50]}...") |
|
embeddings.append([]) |
|
|
|
|
|
df["embeddings"] = embeddings |
|
|
|
|
|
cost = (total_tokens / 1_000) * cost_per_thousand_tokens |
|
|
|
return df, cost |
|
|
|
except Exception as e: |
|
raise RuntimeError(f"Failed to create embeddings: {str(e)}") |
|
|
|
|
|
|
|
def get_embedding(text: str, client: Any, model="text-embedding-3-small", **kwargs) -> List[float]: |
|
""" |
|
Computes the embedding vector for a given text using the specified client and model. The function |
|
replaces newline characters in the input text to mitigate potential negative effects on |
|
performance and then uses the provided client to generate the embedding. |
|
|
|
:param text: The input text to be embedded. |
|
:param client: The client instance used for generating the embedding. |
|
:param model: The model identifier specifying the embedding model to be used. Defaults to |
|
"text-embedding-3-large". |
|
:param kwargs: Additional parameters to be passed to the client's embedding creation method. |
|
:return: The embedding vector of the input text as a list of floats. |
|
:rtype: List[float] |
|
""" |
|
|
|
text = text.replace("\n", " ") |
|
response = client.embeddings.create(input=[text], model=model, **kwargs) |
|
return response.data[0].embedding |
|
|
|
|
|
def cosine_distance(a, b): |
|
""" |
|
Computes the cosine distance between two vectors. |
|
|
|
Parameters: |
|
a: First vector |
|
b: Second vector |
|
|
|
Returns: |
|
Cosine distance between input vectors, as a float. |
|
""" |
|
|
|
|
|
|
|
|
|
return 1 - (np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) |
|
|
|
|
|
def distances_from_embeddings( |
|
query_embedding: List[float], |
|
embeddings: List[List[float]], |
|
distance_metric="cosine", |
|
) -> List[List]: |
|
|
|
"""Return the distances between a query embedding and a list of embeddings.""" |
|
|
|
distance_metrics = { |
|
"cosine": spatial.distance.cosine, |
|
"L1": spatial.distance.cityblock, |
|
"L2": spatial.distance.euclidean, |
|
"Linf": spatial.distance.chebyshev, |
|
} |
|
distances = [ |
|
distance_metrics[distance_metric](query_embedding, embedding) |
|
for embedding in embeddings |
|
] |
|
return distances |
|
|
|
|
|
|
|
def search_text(df, embs_query, n=0, cosine='distance'): |
|
""" |
|
Search for similar texts using embeddings. |
|
|
|
Parameters: |
|
df: DataFrame containing the embeddings |
|
embs_query: Query embedding |
|
n: Number of results to return (0 for all) |
|
cosine: Type of cosine calculation ('distance' or 'similarity') |
|
|
|
Returns: |
|
DataFrame with results sorted by similarity |
|
""" |
|
if cosine == 'similarity': |
|
df["similarity"] = df.embeddings.apply( |
|
lambda x: 1 - cosine_distance(x, embs_query) |
|
) |
|
if n == 0: |
|
results = df.sort_values("similarity", ascending=False) |
|
else: |
|
results = df.sort_values("similarity", ascending=False).head(n) |
|
return results |
|
|
|
elif cosine == 'distance': |
|
|
|
df["distance"] = df.embeddings.apply( |
|
lambda x: cosine_distance(x, embs_query) |
|
) |
|
if n == 0: |
|
results = df.sort_values("distance", ascending=True) |
|
else: |
|
results = df.sort_values("distance", ascending=True).head(n) |
|
return results |
|
|
|
|
|
|
|
|
|
def control_chunk_context(chunks_sorted_df, |
|
current_token_count, |
|
max_token_count, |
|
tokenizer |
|
): |
|
""" |
|
Adds chunks of text to the context while respecting the maximum token count limit. |
|
|
|
This function iterates through a DataFrame containing sorted text chunks, calculates |
|
the token count for each text chunk, and appends them to a context list as long as |
|
the total token count does not exceed the specified maximum. It stops adding chunks |
|
once the token limit is reached. |
|
|
|
:param chunks_sorted_df: The DataFrame containing text chunks sorted in a specific order. |
|
:type chunks_sorted_df: pandas.DataFrame |
|
:param current_token_count: The current token count before adding any text chunk. |
|
:type current_token_count: int |
|
:param max_token_count: The maximum allowable token count for the context. |
|
:type max_token_count: int |
|
:param context: A list that will contain the text chunks added within the token limit. |
|
:type context: list |
|
:param tokenizer: The tokenizer function used to encode text into tokens. Default is |
|
the tokenizer for the "gpt-4o" model. |
|
:type tokenizer: Callable |
|
:return: A tuple containing the updated token count and the updated context list. |
|
:rtype: tuple |
|
""" |
|
|
|
context = [] |
|
for text in chunks_sorted_df["text"].values: |
|
|
|
text_token_count = len(tokenizer.encode(text)) |
|
|
|
if current_token_count + text_token_count <= max_token_count: |
|
|
|
context.append(text) |
|
current_token_count += text_token_count |
|
else: |
|
|
|
break |
|
|
|
return context |