import numpy as np import pandas as pd from typing import List, Dict, Union from itertools import combinations from sklearn.metrics.pairwise import cosine_similarity from .hf_embedding import generate_embeddings as generate_hf_embeddings from .stopwords_bo import TIBETAN_STOPWORDS_SET from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE_SET from .normalize_bo import normalize_particles import logging def _normalize_token_for_stopwords(token: str) -> str: """Normalize token by removing trailing tsek for stopword matching.""" return token.rstrip('་') # Attempt to import the Cython-compiled fast_lcs module try: from .fast_lcs import compute_lcs_fast USE_CYTHON_LCS = True except ImportError: # print("Cython fast_lcs not found, using Python LCS. For better performance, compile the Cython module.") USE_CYTHON_LCS = False logger = logging.getLogger(__name__) def compute_normalized_lcs(words1: List[str], words2: List[str], normalization: str = "avg") -> float: """ Computes the Longest Common Subsequence (LCS) similarity between two token lists. Args: words1: First list of tokens words2: Second list of tokens normalization: How to normalize the LCS length. Options: 'avg' - Divide by average length (default, balanced) 'min' - Divide by shorter text (detects if one text contains the other) 'max' - Divide by longer text (stricter, penalizes length differences) Returns: float: Normalized LCS score between 0.0 and 1.0 Note on normalization choice: - 'avg': Good general-purpose choice, treats both texts equally - 'min': Use when looking for containment (e.g., quotes within commentary) Can return 1.0 if shorter text is fully contained in longer - 'max': Use when you want to penalize length differences Will be lower when texts have very different lengths """ m, n = len(words1), len(words2) if m == 0 or n == 0: return 0.0 if USE_CYTHON_LCS: lcs_length = compute_lcs_fast(words1, words2) else: # Pure Python implementation using dynamic programming dp = np.zeros((m + 1, n + 1), dtype=np.int32) for i in range(1, m + 1): for j in range(1, n + 1): if words1[i - 1] == words2[j - 1]: dp[i, j] = dp[i - 1, j - 1] + 1 else: dp[i, j] = max(dp[i - 1, j], dp[i, j - 1]) lcs_length = int(dp[m, n]) # Apply selected normalization if normalization == "min": divisor = min(m, n) elif normalization == "max": divisor = max(m, n) else: # "avg" (default) divisor = (m + n) / 2 return lcs_length / divisor if divisor > 0 else 0.0 def compute_ngram_similarity(tokens1: List[str], tokens2: List[str], n: int = 2) -> float: """ Computes syllable/token n-gram overlap similarity (Jaccard on n-grams). This is more effective for Tibetan than character-level fuzzy matching because it preserves syllable boundaries and captures local word patterns. Args: tokens1: First list of tokens (syllables or words) tokens2: Second list of tokens (syllables or words) n: Size of n-grams (default: 2 for bigrams) Returns: float: N-gram similarity score between 0.0 and 1.0 """ if not tokens1 or not tokens2: return 0.0 # Handle edge case where text is shorter than n if len(tokens1) < n or len(tokens2) < n: # Fall back to unigram comparison set1, set2 = set(tokens1), set(tokens2) if not set1 or not set2: return 0.0 intersection = len(set1 & set2) union = len(set1 | set2) return intersection / union if union > 0 else 0.0 def get_ngrams(tokens: List[str], size: int) -> set: return set(tuple(tokens[i:i+size]) for i in range(len(tokens) - size + 1)) ngrams1 = get_ngrams(tokens1, n) ngrams2 = get_ngrams(tokens2, n) intersection = len(ngrams1 & ngrams2) union = len(ngrams1 | ngrams2) return intersection / union if union > 0 else 0.0 def compute_syllable_edit_similarity(syls1: List[str], syls2: List[str]) -> float: """ Computes edit distance at the syllable/token level rather than character level. This is more appropriate for Tibetan because: - Tibetan syllables are meaningful units (unlike individual characters) - Character-level Levenshtein over-penalizes syllable differences - Syllable-level comparison better captures textual variation patterns Args: syls1: First list of syllables/tokens syls2: Second list of syllables/tokens Returns: float: Syllable-level similarity score between 0.0 and 1.0 """ if not syls1 and not syls2: return 1.0 if not syls1 or not syls2: return 0.0 m, n = len(syls1), len(syls2) # Create DP table for syllable-level edit distance dp = np.zeros((m + 1, n + 1), dtype=np.int32) # Initialize base cases for i in range(m + 1): dp[i, 0] = i for j in range(n + 1): dp[0, j] = j # Fill DP table for i in range(1, m + 1): for j in range(1, n + 1): if syls1[i - 1] == syls2[j - 1]: dp[i, j] = dp[i - 1, j - 1] else: dp[i, j] = 1 + min( dp[i - 1, j], # deletion dp[i, j - 1], # insertion dp[i - 1, j - 1] # substitution ) edit_distance = dp[m, n] max_len = max(m, n) return 1.0 - (edit_distance / max_len) if max_len > 0 else 1.0 def compute_weighted_jaccard(tokens1: List[str], tokens2: List[str]) -> float: """ Computes weighted Jaccard similarity using token frequencies. Unlike standard Jaccard which treats all tokens as binary (present/absent), this considers how often each token appears, giving more weight to frequently shared terms. Args: tokens1: First list of tokens tokens2: Second list of tokens Returns: float: Weighted Jaccard similarity between 0.0 and 1.0 """ from collections import Counter if not tokens1 or not tokens2: return 0.0 c1, c2 = Counter(tokens1), Counter(tokens2) # Intersection: min count for each shared token intersection = sum((c1 & c2).values()) # Union: max count for each token union = sum((c1 | c2).values()) return intersection / union if union > 0 else 0.0 def compute_fuzzy_similarity(words1: List[str], words2: List[str], method: str = 'ngram') -> float: """ Computes fuzzy string similarity between two lists of words. All methods work at the syllable/token level, which is linguistically appropriate for Tibetan text. Args: words1: First list of tokens words2: Second list of tokens method: The fuzzy matching method to use: 'ngram' - Syllable bigram overlap (default, recommended) 'syllable_edit' - Syllable-level edit distance 'weighted_jaccard' - Frequency-weighted Jaccard Returns: float: Fuzzy similarity score between 0.0 and 1.0 """ if not words1 or not words2: return 0.0 if method == 'ngram': # Syllable bigram overlap - good for detecting shared phrases return compute_ngram_similarity(words1, words2, n=2) elif method == 'syllable_edit': # Syllable-level edit distance - good for detecting minor variations return compute_syllable_edit_similarity(words1, words2) elif method == 'weighted_jaccard': # Frequency-weighted Jaccard - good for repeated terms return compute_weighted_jaccard(words1, words2) else: # Default to ngram for any unrecognized method return compute_ngram_similarity(words1, words2, n=2) def compute_semantic_similarity( text1_segment: str, text2_segment: str, model, batch_size: int = 32, show_progress_bar: bool = False ) -> float: """ Computes semantic similarity using a Sentence Transformer model. Args: text1_segment: First text segment text2_segment: Second text segment model: Pre-loaded SentenceTransformer model batch_size: Batch size for encoding show_progress_bar: Whether to show progress bar Returns: float: Cosine similarity between embeddings (0.0 to 1.0), or np.nan on error """ if model is None: logger.warning( "Embedding model not available for semantic similarity. Skipping calculation." ) return np.nan if not text1_segment or not text2_segment: logger.info( "One or both texts are empty for semantic similarity. Returning 0.0." ) return 0.0 def _get_embedding(raw_text: str) -> Union[np.ndarray, None]: """Helper to get embedding for a single text.""" if not raw_text.strip(): logger.info("Text is empty or whitespace. Returning None.") return None embedding = generate_hf_embeddings( texts=[raw_text], model=model, batch_size=batch_size, show_progress_bar=show_progress_bar ) if embedding is None or embedding.size == 0: logger.error(f"Failed to generate embedding for text: {raw_text[:100]}...") return None return embedding try: emb1 = _get_embedding(text1_segment) emb2 = _get_embedding(text2_segment) if emb1 is None or emb2 is None or emb1.size == 0 or emb2.size == 0: logger.error( "Failed to obtain one or both embeddings for semantic similarity." ) return np.nan # Ensure embeddings are numpy arrays (should be, but defensive) if not isinstance(emb1, np.ndarray): emb1 = np.array(emb1) if not isinstance(emb2, np.ndarray): emb2 = np.array(emb2) # Handle cases where embeddings are all zeros if np.all(emb1 == 0) and np.all(emb2 == 0): logger.info("Both embeddings are zero. Semantic similarity is 0.0.") return 0.0 if np.all(emb1 == 0) or np.all(emb2 == 0): logger.info("One of the embeddings is zero. Semantic similarity is 0.0.") return 0.0 # Handle NaN or Inf in embeddings if np.isnan(emb1).any() or np.isinf(emb1).any() or \ np.isnan(emb2).any() or np.isinf(emb2).any(): logger.warning("NaN or Inf found in embeddings. Semantic similarity set to 0.0.") return 0.0 # Ensure embeddings are 2D for cosine_similarity: [1, dim] if emb1.ndim == 1: emb1 = emb1.reshape(1, -1) if emb2.ndim == 1: emb2 = emb2.reshape(1, -1) similarity_score = cosine_similarity(emb1, emb2)[0][0] return max(0.0, float(similarity_score)) except Exception as e: safe_text1 = str(text1_segment)[:100] if text1_segment is not None else "N/A" safe_text2 = str(text2_segment)[:100] if text2_segment is not None else "N/A" logger.error( f"Error during semantic similarity calculation:\nText1: {safe_text1}...\nText2: {safe_text2}...\nError: {e}" ) logger.exception("Traceback for semantic similarity calculation error:") return np.nan def compute_all_metrics( texts: Dict[str, str], token_lists: Dict[str, List[str]], model=None, enable_semantic: bool = True, enable_fuzzy: bool = True, fuzzy_method: str = 'token_set', lcs_normalization: str = 'avg', use_stopwords: bool = True, use_lite_stopwords: bool = False, normalize_particles_opt: bool = False, batch_size: int = 32, show_progress_bar: bool = False ) -> pd.DataFrame: """ Computes all selected similarity metrics between pairs of texts. Args: texts (Dict[str, str]): A dictionary where keys are text identifiers (e.g., filenames or segment IDs) and values are the text content strings. token_lists (Dict[str, List[str]]): Pre-tokenized text for each text identifier. model (SentenceTransformer, optional): The pre-loaded sentence transformer model. Defaults to None. enable_semantic (bool): Whether to compute semantic similarity. Defaults to True. enable_fuzzy (bool): Whether to compute fuzzy string similarity. Defaults to True. fuzzy_method (str): The fuzzy matching method to use ('ngram', 'syllable_edit', 'weighted_jaccard'). Defaults to 'token_set'. lcs_normalization (str): How to normalize LCS ('avg', 'min', 'max'). Defaults to 'avg'. use_stopwords (bool): Whether to filter stopwords for Jaccard similarity. Defaults to True. use_lite_stopwords (bool): Whether to use the lite version of stopwords. Defaults to False. normalize_particles_opt (bool): Whether to normalize grammatical particles (གི/ཀྱི/གྱི → གི). Reduces false negatives from sandhi variation. Defaults to False. batch_size (int): Batch size for semantic similarity computation. Defaults to 32. show_progress_bar (bool): Whether to show progress bar for semantic similarity. Defaults to False. Returns: pd.DataFrame: A DataFrame where each row contains the metrics for a pair of texts, including 'Text Pair', 'Jaccard Similarity (%)', 'Normalized LCS', 'Fuzzy Similarity', and 'Semantic Similarity'. """ files = list(texts.keys()) results = [] for i, j in combinations(range(len(files)), 2): f1, f2 = files[i], files[j] words1_raw, words2_raw = token_lists[f1], token_lists[f2] # Select appropriate stopwords set based on user preference if use_stopwords: # Choose between regular and lite stopwords sets if use_lite_stopwords: stopwords_set_to_use = TIBETAN_STOPWORDS_LITE_SET else: stopwords_set_to_use = TIBETAN_STOPWORDS_SET else: # If stopwords are disabled, use an empty set stopwords_set_to_use = set() # Filter stopwords for Jaccard calculation (normalize tokens for consistent matching) words1_filtered = [word for word in words1_raw if _normalize_token_for_stopwords(word) not in stopwords_set_to_use] words2_filtered = [word for word in words2_raw if _normalize_token_for_stopwords(word) not in stopwords_set_to_use] # Apply particle normalization if enabled if normalize_particles_opt: words1_jaccard = normalize_particles(words1_filtered) words2_jaccard = normalize_particles(words2_filtered) words1_lcs = normalize_particles(words1_raw) words2_lcs = normalize_particles(words2_raw) else: words1_jaccard = words1_filtered words2_jaccard = words2_filtered words1_lcs = words1_raw words2_lcs = words2_raw jaccard = ( len(set(words1_jaccard) & set(words2_jaccard)) / len(set(words1_jaccard) | set(words2_jaccard)) if set(words1_jaccard) | set(words2_jaccard) # Ensure denominator is not zero else 0.0 ) jaccard_percent = jaccard * 100.0 # LCS uses tokens (with optional particle normalization) norm_lcs = compute_normalized_lcs(words1_lcs, words2_lcs, normalization=lcs_normalization) # Fuzzy Similarity Calculation if enable_fuzzy: fuzzy_sim = compute_fuzzy_similarity(words1_jaccard, words2_jaccard, method=fuzzy_method) else: fuzzy_sim = np.nan # Semantic Similarity Calculation if enable_semantic: semantic_sim = compute_semantic_similarity( texts[f1], texts[f2], model, batch_size=batch_size, show_progress_bar=show_progress_bar ) else: semantic_sim = np.nan results.append( { "Text Pair": f"{f1} vs {f2}", "Jaccard Similarity (%)": jaccard_percent, "Normalized LCS": norm_lcs, "Fuzzy Similarity": fuzzy_sim, "Semantic Similarity": semantic_sim } ) return pd.DataFrame(results)