#!/usr/bin/env python3 """ Entropy-Based Radial Chunking Search Engine This module implements an advanced search engine that uses high-entropy tokens as semantic centers for intelligent text chunking and retrieval. The system employs semantic binary search and adaptive chunking strategies for efficient information retrieval. The algorithm works by: 1. Calculating entropy for each token based on its embedding 2. Identifying high-entropy tokens as semantic centers 3. Creating chunks around these centers 4. Performing semantic search using binary search in the semantic space Module: rag.py Author: Aninokuma from Stealth Hut Date: 2025 Version: 1.0.0 License: MIT """ import sqlite3 import numpy as np from safetensors import safe_open from transformers import AutoTokenizer import time import os from collections import defaultdict, Counter import math from contextlib import contextmanager from typing import List, Dict, Tuple, Optional, Any class EntropyRadialSearch: """Search engine with entropy-based radial chunking This class implements an advanced search system that uses high-entropy tokens as semantic centers to create meaningful text chunks. The system provides efficient retrieval through semantic binary search and adaptive chunking strategies. Attributes: tokenizer: Pre-trained tokenizer for tokenizing text embeddings: Token embeddings loaded from the model db_path: Path to the SQLite database for storing chunks and metadata """ def __init__(self, model_dir: str = "qwen3_int8_harmonic", db_path: str = "entropy_radial_search.db"): """Initialize the EntropyRadialSearch instance. Args: model_dir: Directory containing the pre-trained model files db_path: Path to the SQLite database file """ self._log_message("Loading Entropy-Based Radial Search Engine") # Load model self.tokenizer = AutoTokenizer.from_pretrained(model_dir) tensors = {} with safe_open(f"{model_dir}/model.safetensors", framework="numpy") as f: for key in f.keys(): tensors[key] = f.get_tensor(key) self.embeddings = tensors["embeddings"] self.weights = tensors["weights"] # Load the weights tensor for rarity computation self._log_message(f"Model loaded: {len(self.embeddings)} tokens, weights tensor: {len(self.weights)} values") # Initialize database self.db_path = db_path self.init_database() self._log_message(f"Database ready: {db_path}") def _log_message(self, message: str) -> None: """Log a message to standard output. Args: message: The message to log """ print(f"[LOG] {message}") @contextmanager def get_connection(self): """SQLite connection context manager Provides a database connection that is automatically closed after use. Yields: sqlite3.Connection: Database connection object """ conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def init_database(self) -> None: """Initialize database with enhanced schema Creates the necessary tables and indexes for storing document chunks, entropy analysis data, and entropy centers. This method ensures all required database structures exist before the search engine is used. """ try: with self.get_connection() as conn: # Documents table with enhanced chunk info conn.execute(""" CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, chunk_id INTEGER, content TEXT, token_start INTEGER, token_end INTEGER, center_token TEXT, entropy_score REAL, radius INTEGER, adaptive_radius BOOLEAN DEFAULT FALSE, local_density REAL DEFAULT 0.0, fallback_chunk BOOLEAN DEFAULT FALSE, embedding BLOB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Entropy analysis table conn.execute(""" CREATE TABLE IF NOT EXISTS entropy_analysis ( filename TEXT PRIMARY KEY, total_tokens INTEGER, total_chunks INTEGER, entropy_threshold REAL, high_entropy_tokens INTEGER, avg_chunk_size REAL, processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # High-entropy centers table conn.execute(""" CREATE TABLE IF NOT EXISTS entropy_centers ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT, token_position INTEGER, token_text TEXT, entropy_score REAL, chunk_assigned BOOLEAN DEFAULT FALSE ) """) # Indexes conn.execute("CREATE INDEX IF NOT EXISTS idx_chunk_id ON documents(chunk_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_entropy_score ON documents(entropy_score)") conn.commit() except sqlite3.Error as e: self._log_message(f"Database initialization error: {e}") raise def calculate_token_entropy(self, tokens: List[int]) -> List[Tuple[int, str, float]]: """Calculate entropy for each token based on vector embedding entropy. This method calculates entropy scores for tokens using both Shannon entropy of normalized vector components and vector variance to identify high-entropy tokens that serve as semantic centers. Args: tokens: List of token IDs to calculate entropy for Returns: List of tuples containing (position, token_text, entropy_score) """ self._log_message("Calculating vector entropy...") entropy_scores: List[Tuple[int, str, float]] = [] # Calculate entropy for each token's embedding for i, token_id in enumerate(tokens): if token_id < len(self.embeddings): vector = self.embeddings[token_id] # Method 1: Shannon entropy of normalized vector components normalized_vector = vector / np.linalg.norm(vector) # Add small epsilon to avoid log(0) epsilon = 1e-10 # Use absolute values to avoid negative log values entropy = -np.sum(np.abs(normalized_vector) * np.log2(np.abs(normalized_vector) + epsilon)) # Method 2: Vector variance (higher variance = more entropy) variance = np.var(vector) variance_boost = 1.0 + variance * 0.1 # Incorporate weights for rarity - higher weights indicate rarer tokens if token_id < len(self.weights): # Weights are in range [0, 127], higher means rarer/more important weight_value = self.weights[token_id].item() # Normalize weight to create a rarity multiplier (e.g., 1.0 to 2.0 range) rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 # Boost up to 2x else: rarity_multiplier = 1.0 # Combined entropy score with rarity adjustment entropy_score = entropy * variance_boost * rarity_multiplier # Get token text for reference token_text = self.tokenizer.decode([token_id]) entropy_scores.append((i, token_text, entropy_score)) return entropy_scores def find_entropy_centers(self, entropy_scores: List[Tuple[int, str, float]], percentile: int = 99) -> Tuple[List[Dict[str, Any]], float]: """Find high-entropy tokens to use as chunk centers. This method identifies tokens with entropy scores above the specified percentile to serve as semantic centers for chunking. Args: entropy_scores: List of (position, token_text, entropy_score) tuples percentile: Percentile threshold for selecting high-entropy tokens Returns: Tuple containing list of center dictionaries and entropy threshold """ self._log_message(f"Finding entropy centers (> {percentile}th percentile)...") # Calculate percentile threshold scores = [score for _, _, score in entropy_scores] if not scores: return [], 0.0 threshold = np.percentile(scores, percentile) self._log_message(f" Entropy threshold: {threshold:.3f}") # Find high-entropy tokens centers = [] for i, token, score in entropy_scores: if score >= threshold: centers.append({ 'position': i, 'token': token, 'score': score }) # Sort by entropy score (highest first) centers.sort(key=lambda x: x['score'], reverse=True) self._log_message(f" Found {len(centers)} entropy centers") return centers, threshold def calculate_adaptive_radius(self, center_score: float, avg_entropy: float, local_entropy_density: float, base_radius: int = 100) -> int: """Calculate adaptive radius based on entropy and local density. This method calculates a dynamic radius for chunking based on the center token's entropy score and the local entropy density. Args: center_score: Entropy score of the center token avg_entropy: Average entropy across all tokens local_entropy_density: Density of high-entropy tokens in the local area base_radius: Base radius to use as reference Returns: Integer radius value bounded between 50 and 200 tokens """ # Higher entropy centers get larger radius entropy_factor = center_score / avg_entropy if avg_entropy > 0 else 1.0 # Lower density areas get larger radius (sparse regions need more coverage) density_factor = 1.0 / (1.0 + local_entropy_density) # Adaptive radius with bounds adaptive_radius = int(base_radius * entropy_factor * density_factor) return max(50, min(200, adaptive_radius)) # Bound between 50-200 tokens def calculate_local_entropy_density(self, centers: List[Dict[str, Any]], position: int, window: int = 50) -> float: """Calculate entropy density around a position. This method calculates the density of high-entropy tokens within a specified window around a given position. Args: centers: List of high-entropy token centers position: Position to calculate density around window: Size of the window for density calculation Returns: Float representing the entropy density """ nearby_centers = [c for c in centers if abs(c['position'] - position) <= window] if not nearby_centers: return 0.0 return len(nearby_centers) / (2 * window + 1) def is_semantic_boundary(self, token_id: int, tokens: List[int], pos: int) -> bool: """Check if token position represents a language-agnostic semantic boundary. This method determines if a token position represents a semantic boundary by analyzing embedding similarities with neighboring tokens. Args: token_id: ID of the current token tokens: List of all token IDs pos: Position of the current token in the token list Returns: Boolean indicating whether this position is a semantic boundary """ if pos < 0 or pos >= len(tokens): return True # Document boundaries are always boundaries # Get current token embedding if token_id >= len(self.embeddings): return True current_vector = self.embeddings[token_id] # Calculate semantic similarity with neighbors boundary_scores = [] # Check left semantic discontinuity if pos > 0: left_token_id = tokens[pos - 1] if left_token_id < len(self.embeddings): left_vector = self.embeddings[left_token_id] # Cosine similarity (lower = more boundary-like) similarity = np.dot(current_vector, left_vector) / ( np.linalg.norm(current_vector) * np.linalg.norm(left_vector) ) boundary_scores.append(1.0 - similarity) # Convert to boundary score # Check right semantic discontinuity if pos < len(tokens) - 1: right_token_id = tokens[pos + 1] if right_token_id < len(self.embeddings): right_vector = self.embeddings[right_token_id] similarity = np.dot(current_vector, right_vector) / ( np.linalg.norm(current_vector) * np.linalg.norm(right_vector) ) boundary_scores.append(1.0 - similarity) # Convert to boundary score # Check larger context (2 tokens away) for broader boundaries if pos > 1: left2_token_id = tokens[pos - 2] if left2_token_id < len(self.embeddings): left2_vector = self.embeddings[left2_token_id] similarity = np.dot(current_vector, left2_vector) / ( np.linalg.norm(current_vector) * np.linalg.norm(left2_vector) ) boundary_scores.append(1.0 - similarity) if pos < len(tokens) - 2: right2_token_id = tokens[pos + 2] if right2_token_id < len(self.embeddings): right2_vector = self.embeddings[right2_token_id] similarity = np.dot(current_vector, right2_vector) / ( np.linalg.norm(current_vector) * np.linalg.norm(right2_vector) ) boundary_scores.append(1.0 - similarity) # Average boundary score if boundary_scores: avg_boundary_score = sum(boundary_scores) / len(boundary_scores) # Check if vector entropy is also high (indicating transition) if token_id < len(self.embeddings): vector = self.embeddings[token_id] normalized_vector = vector / np.linalg.norm(vector) epsilon = 1e-10 # Use absolute values to avoid negative log values entropy = -np.sum(np.abs(normalized_vector) * np.log2(np.abs(normalized_vector) + epsilon)) # Incorporate weights for rarity - higher weights indicate rarer tokens if token_id < len(self.weights): weight_value = self.weights[token_id].item() rarity_multiplier = 1.0 + (weight_value / 127.0) * 0.5 # Moderate boost for boundaries else: rarity_multiplier = 1.0 entropy = entropy * rarity_multiplier # Combine semantic discontinuity with entropy combined_score = avg_boundary_score * (1.0 + entropy * 0.001) # Threshold determined empirically - works across languages return combined_score > 0.4 return False def expand_to_boundaries(self, tokens: List[int], center_pos: int, max_expansion: int = 500) -> Tuple[int, int]: """Naturally expand from center until hitting natural boundaries. This method expands from a center position in both directions until semantic boundaries are encountered. Args: tokens: List of token IDs center_pos: Starting position for expansion max_expansion: Maximum number of tokens to expand in each direction Returns: Tuple containing start and end positions for the chunk """ start_pos = center_pos end_pos = center_pos # Expand left until boundary left_expansion = 0 while start_pos > 0 and left_expansion < max_expansion: prev_pos = start_pos - 1 if self.is_semantic_boundary(tokens[prev_pos], tokens, prev_pos): break start_pos = prev_pos left_expansion += 1 # Expand right until boundary right_expansion = 0 while end_pos < len(tokens) - 1 and right_expansion < max_expansion: next_pos = end_pos + 1 if self.is_semantic_boundary(tokens[next_pos], tokens, next_pos): break end_pos = next_pos right_expansion += 1 return start_pos, end_pos + 1 # +1 for inclusive range def create_radial_chunks(self, tokens: List[int], centers: List[Dict[str, Any]], max_expansion: int = 500) -> List[Dict[str, Any]]: """Slice corpus at midpoints between 99th percentile high-entropy tokens. This method creates chunks by slicing the text at midpoints between high-entropy token centers. Args: tokens: List of all token IDs in the document centers: List of high-entropy token centers max_expansion: Maximum expansion for chunk boundaries (not used in this method) Returns: List of chunk dictionaries with position and content information """ self._log_message(f"SLICING corpus at midpoints between {len(centers)} high-entropy centers...") if not centers: return [] # Sort centers by position sorted_centers = sorted(centers, key=lambda x: x['position']) chunks = [] # First chunk: start to first midpoint if len(sorted_centers) > 1: first_center = sorted_centers[0] second_center = sorted_centers[1] midpoint = (first_center['position'] + second_center['position']) // 2 start_pos = 0 end_pos = midpoint + 1 chunk_tokens = tokens[start_pos:end_pos] chunks.append({ 'center_pos': first_center['position'], 'center_token': first_center['token'], 'center_score': first_center['score'], 'start_pos': start_pos, 'end_pos': end_pos, 'tokens': chunk_tokens, 'chunk_id': len(chunks) }) # Middle chunks: between adjacent midpoints for i in range(1, len(sorted_centers) - 1): current_center = sorted_centers[i] prev_center = sorted_centers[i-1] next_center = sorted_centers[i+1] left_midpoint = (prev_center['position'] + current_center['position']) // 2 right_midpoint = (current_center['position'] + next_center['position']) // 2 start_pos = left_midpoint + 1 end_pos = right_midpoint + 1 chunk_tokens = tokens[start_pos:end_pos] chunks.append({ 'center_pos': current_center['position'], 'center_token': current_center['token'], 'center_score': current_center['score'], 'start_pos': start_pos, 'end_pos': end_pos, 'tokens': chunk_tokens, 'chunk_id': len(chunks) }) # Last chunk: last midpoint to end if len(sorted_centers) > 1: last_center = sorted_centers[-1] prev_center = sorted_centers[-2] midpoint = (prev_center['position'] + last_center['position']) // 2 start_pos = midpoint + 1 end_pos = len(tokens) chunk_tokens = tokens[start_pos:end_pos] chunks.append({ 'center_pos': last_center['position'], 'center_token': last_center['token'], 'center_score': last_center['score'], 'start_pos': start_pos, 'end_pos': end_pos, 'tokens': chunk_tokens, 'chunk_id': len(chunks) }) # If only one center, whole document is one chunk elif len(sorted_centers) == 1: center = sorted_centers[0] chunks.append({ 'center_pos': center['position'], 'center_token': center['token'], 'center_score': center['score'], 'start_pos': 0, 'end_pos': len(tokens), 'tokens': tokens, 'chunk_id': len(chunks) }) self._log_message(f" Sliced corpus into {len(chunks)} chunks at high-entropy midpoints") self._log_message(f" Perfect coverage: 100% (corpus fully partitioned)") self._log_message(f" Avg chunk size: {len(tokens)/len(chunks):.1f} tokens") return chunks def semantic_binary_search(self, query: str, max_depth: int = 5) -> List[Dict[str, Any]]: """Navigate corpus using semantic binary search on 99th percentile entropy tokens. This method performs a binary search in semantic space by analyzing high-entropy tokens in chunks and navigating toward the most relevant content based on query similarity. Args: query: Query string to search for max_depth: Maximum depth for semantic search navigation Returns: List of context chunks around the stopping point """ self._log_message(f"SEMANTIC BINARY SEARCH: '{query}'") self._log_message("=" * 60) # Encode query query_embedding = self.encode_text(query) # Get all chunks with their centers with self.get_connection() as conn: cursor = conn.execute(""" SELECT chunk_id, center_token, entropy_score, token_start, token_end, content, embedding FROM documents ORDER BY chunk_id """) chunks = [] for row in cursor: doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32) similarity = self.cosine_similarity(query_embedding, doc_embedding) chunks.append({ 'chunk_id': row['chunk_id'], 'center_token': row['center_token'], 'center_score': row['entropy_score'], 'token_start': row['token_start'], 'token_end': row['token_end'], 'content': row['content'], 'similarity': similarity }) # Sort by similarity to find best starting point chunks.sort(key=lambda x: x['similarity'], reverse=True) if not chunks: self._log_message("No chunks found for semantic search") return [] # Start with best chunk current_chunk = chunks[0] path = [current_chunk] self._log_message(f"Starting at Chunk {current_chunk['chunk_id']} (similarity: {current_chunk['similarity']:.3f})") self._log_message(f" Center: '{current_chunk['center_token']}' (entropy: {current_chunk['center_score']:.2f})") for depth in range(max_depth): self._log_message(f"\nDepth {depth + 1}: Analyzing Chunk {current_chunk['chunk_id']}") # Get tokens for this chunk (excluding center) chunk_tokens = [] with self.get_connection() as conn: cursor = conn.execute(""" SELECT content, token_start, token_end FROM documents WHERE chunk_id = ? """, (current_chunk['chunk_id'],)) row = cursor.fetchone() if row: # Reconstruct chunk tokens from stored content chunk_text = row['content'] chunk_tokens = self.tokenizer.encode(chunk_text, add_special_tokens=False) # Update current_chunk with token positions current_chunk['token_start'] = row['token_start'] current_chunk['token_end'] = row['token_end'] if len(chunk_tokens) < 3: self._log_message(f" Chunk too small for binary navigation") break # Find center position in chunk tokens center_token_id = self.tokenizer.encode(current_chunk['center_token'], add_special_tokens=False)[0] if self.tokenizer.encode(current_chunk['center_token'], add_special_tokens=False) else None if center_token_id is None: self._log_message(f" Cannot find center token in chunk") break try: center_pos = chunk_tokens.index(center_token_id) except ValueError: center_pos = len(chunk_tokens) // 2 # Fallback to middle self._log_message(f" Center token not found, using middle position {center_pos}") # Split into left and right sections left_tokens = chunk_tokens[:center_pos] right_tokens = chunk_tokens[center_pos + 1:] self._log_message(f" Left section: {len(left_tokens)} tokens") self._log_message(f" Right section: {len(right_tokens)} tokens") # Find 99th percentile high-entropy tokens in each section left_entropy_scores = [] for i, token_id in enumerate(left_tokens): if token_id < len(self.embeddings): vector = self.embeddings[token_id] normalized_vector = vector / np.linalg.norm(vector) epsilon = 1e-10 entropy = -np.sum(normalized_vector * np.log2(np.abs(normalized_vector) + epsilon)) variance = np.var(vector) # Incorporate weights for rarity - higher weights indicate rarer tokens if token_id < len(self.weights): weight_value = self.weights[token_id].item() rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 # Boost up to 2x else: rarity_multiplier = 1.0 entropy_score = entropy * (1.0 + variance * 0.1) * rarity_multiplier left_entropy_scores.append((i, token_id, entropy_score)) right_entropy_scores = [] for i, token_id in enumerate(right_tokens): if token_id < len(self.embeddings): vector = self.embeddings[token_id] normalized_vector = vector / np.linalg.norm(vector) epsilon = 1e-10 entropy = -np.sum(normalized_vector * np.log2(np.abs(normalized_vector) + epsilon)) variance = np.var(vector) # Incorporate weights for rarity - higher weights indicate rarer tokens if token_id < len(self.weights): weight_value = self.weights[token_id].item() rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 # Boost up to 2x else: rarity_multiplier = 1.0 entropy_score = entropy * (1.0 + variance * 0.1) * rarity_multiplier right_entropy_scores.append((i, token_id, entropy_score)) # Get 99th percentile threshold for each section if left_entropy_scores: left_scores = [score for _, _, score in left_entropy_scores] left_threshold = np.percentile(left_scores, 99) if len(left_scores) > 0 else 0 left_high_entropy = [(pos, token_id, score) for pos, token_id, score in left_entropy_scores if score >= left_threshold] else: left_high_entropy = [] left_threshold = 0 if right_entropy_scores: right_scores = [score for _, _, score in right_entropy_scores] right_threshold = np.percentile(right_scores, 99) if len(right_scores) > 0 else 0 right_high_entropy = [(pos, token_id, score) for pos, token_id, score in right_entropy_scores if score >= right_threshold] else: right_high_entropy = [] right_threshold = 0 self._log_message(f" Left 99th percentile threshold: {left_threshold:.2f} ({len(left_high_entropy)} tokens)") self._log_message(f" Right 99th percentile threshold: {right_threshold:.2f} ({len(right_high_entropy)} tokens)") if not left_high_entropy and not right_high_entropy: self._log_message(f" No high-entropy tokens found - search complete") break # Calculate query similarity to high-entropy tokens left_similarity_sum = 0 right_similarity_sum = 0 for pos, token_id, score in left_high_entropy: if token_id < len(self.embeddings): token_embedding = self.embeddings[token_id] similarity = self.cosine_similarity(query_embedding, token_embedding) left_similarity_sum += similarity * score for pos, token_id, score in right_high_entropy: if token_id < len(self.embeddings): token_embedding = self.embeddings[token_id] similarity = self.cosine_similarity(query_embedding, token_embedding) right_similarity_sum += similarity * score self._log_message(f" Left query similarity: {left_similarity_sum:.3f}") self._log_message(f" Right query similarity: {right_similarity_sum:.3f}") # Decide direction if left_similarity_sum > right_similarity_sum: direction = "LEFT" self._log_message(f" Query more similar to LEFT section") elif right_similarity_sum > left_similarity_sum: direction = "RIGHT" self._log_message(f" Query more similar to RIGHT section") else: direction = "EQUAL" self._log_message(f" Equal similarity - search complete") break # Find adjacent chunk in that direction with self.get_connection() as conn: if direction == "LEFT": cursor = conn.execute(""" SELECT chunk_id, center_token, entropy_score, content, embedding FROM documents WHERE token_end < ? ORDER BY token_end DESC LIMIT 1 """, (current_chunk['token_start'],)) else: # RIGHT cursor = conn.execute(""" SELECT chunk_id, center_token, entropy_score, content, embedding FROM documents WHERE token_start > ? ORDER BY token_start ASC LIMIT 1 """, (current_chunk['token_end'],)) next_row = cursor.fetchone() if next_row: next_embedding = np.frombuffer(next_row['embedding'], dtype=np.float32) next_similarity = self.cosine_similarity(query_embedding, next_embedding) current_chunk = { 'chunk_id': next_row['chunk_id'], 'center_token': next_row['center_token'], 'center_score': next_row['entropy_score'], 'content': next_row['content'], 'similarity': next_similarity } path.append(current_chunk) self._log_message(f" Moved {direction} to Chunk {current_chunk['chunk_id']} (similarity: {current_chunk['similarity']:.3f})") self._log_message(f" Center: '{current_chunk['center_token']}' (entropy: {current_chunk['center_score']:.2f})") self._log_message(f" Content: \"{current_chunk['content'][:100]}...\"") else: self._log_message(f" No more chunks in {direction} direction") break self._log_message(f"\nSEMANTIC SEARCH PATH:") self._log_message("-" * 40) for i, chunk in enumerate(path): self._log_message(f"Step {i}: Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})") self._log_message(f" Center: '{chunk['center_token']}' - \"{chunk['content'][:80]}...\"") # Return context window around stopping point (~512 target tokens) return self.get_context_window(current_chunk, query_embedding, target_tokens=512) def get_context_window(self, center_chunk, query_embedding, target_tokens=512): """Get surrounding chunks around the stopping point until reaching target token count""" print(f"\nšŸ” GETTING CONTEXT WINDOW: ~{target_tokens} tokens around Chunk {center_chunk['chunk_id']}") self._log_message("=" * 60) with self.get_connection() as conn: # Get all chunks ordered by chunk_id for token-based expansion cursor = conn.execute(""" SELECT chunk_id, center_token, entropy_score, token_start, token_end, content, embedding FROM documents ORDER BY chunk_id """) all_chunks = [] for row in cursor: doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32) similarity = self.cosine_similarity(query_embedding, doc_embedding) tokens_in_chunk = row['token_end'] - row['token_start'] all_chunks.append({ 'chunk_id': row['chunk_id'], 'center_token': row['center_token'], 'center_score': row['entropy_score'], 'token_start': row['token_start'], 'token_end': row['token_end'], 'content': row['content'], 'similarity': similarity, 'tokens_count': tokens_in_chunk, 'direction': 'CENTER' if row['chunk_id'] == center_chunk['chunk_id'] else None }) # Find the center chunk position in the ordered list center_idx = None for i, chunk in enumerate(all_chunks): if chunk['chunk_id'] == center_chunk['chunk_id']: center_idx = i chunk['direction'] = 'CENTER' # Mark as center break if center_idx is None: self._log_message("Center chunk not found in all chunks list") return [] # Expand outwards from the center chunk until we reach target token count context_chunks = [all_chunks[center_idx]] total_tokens = all_chunks[center_idx]['tokens_count'] left_idx = center_idx - 1 right_idx = center_idx + 1 while total_tokens < target_tokens: # Determine which direction to expand (whichever has more available chunks or smaller addition) left_available = left_idx >= 0 right_available = right_idx < len(all_chunks) if not left_available and not right_available: break # No more chunks to add # If only one side is available, take from that side if left_available and not right_available: next_chunk = all_chunks[left_idx] left_idx -= 1 elif right_available and not left_available: next_chunk = all_chunks[right_idx] right_idx += 1 else: # Both sides available, take the one with fewer tokens to balance context left_chunk = all_chunks[left_idx] right_chunk = all_chunks[right_idx] if left_chunk['tokens_count'] <= right_chunk['tokens_count']: next_chunk = left_chunk left_idx -= 1 else: next_chunk = right_chunk right_idx += 1 # Add direction indicator direction = 'BEFORE' if next_chunk['chunk_id'] < center_chunk['chunk_id'] else 'AFTER' next_chunk['direction'] = direction context_chunks.append(next_chunk) total_tokens += next_chunk['tokens_count'] # Sort by similarity for display and return context_chunks_sorted = sorted(context_chunks, key=lambda x: x['similarity'], reverse=True) self._log_message(f"CONTEXT WINDOW RESULTS:") self._log_message(f" Total chunks: {len(context_chunks)}") self._log_message(f" Total tokens: {total_tokens}") self._log_message(f" Range: Chunk {min(c['chunk_id'] for c in context_chunks)} to Chunk {max(c['chunk_id'] for c in context_chunks)}") self._log_message(f" Center: Chunk {center_chunk['chunk_id']} ('{center_chunk['center_token']}')") self._log_message(f"\nTOP 10 MOST SIMILAR CHUNKS IN CONTEXT:") self._log_message("-" * 50) for i, chunk in enumerate(context_chunks_sorted[:10], 1): self._log_message(f"{i}. Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})") self._log_message(f" Direction: {chunk['direction']}, Tokens: {chunk['tokens_count']}") self._log_message(f" Center: '{chunk['center_token']}' (entropy: {chunk['center_score']:.2f})") self._log_message(f" Content: \"{chunk['content'][:100]}...\"") self._log_message("") # Look for specific keywords in the context keywords = ['radar', 'cavity', 'microwave', 'waveguide', 'standing', 'waves', 'zero-point', 'energy'] keyword_matches = [] for chunk in context_chunks: content_lower = chunk['content'].lower() for keyword in keywords: if keyword in content_lower: if chunk not in keyword_matches: keyword_matches.append(chunk) break if keyword_matches: self._log_message(f"\nKEYWORD MATCHES FOUND:") self._log_message("-" * 40) for i, chunk in enumerate(keyword_matches[:5], 1): self._log_message(f"{i}. Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})") # Show the keyword match content_lower = chunk['content'].lower() matched_keywords = [kw for kw in keywords if kw in content_lower] self._log_message(f" Keywords: {', '.join(matched_keywords)}") self._log_message(f" Content: \"{chunk['content'][:150]}...\"") self._log_message("") self._log_message(f"CONTEXT WINDOW SEARCH COMPLETE") self._log_message(f" Analyzed {len(context_chunks)} chunks (~{total_tokens} tokens) around the semantic search stopping point") return context_chunks_sorted def create_fallback_chunks(self, tokens: List[int], unassigned_positions: List[int], start_chunk_id: int, fallback_size: int = 64) -> List[Dict[str, Any]]: """Create chunks for unassigned tokens to ensure complete coverage. This method creates fallback chunks for any tokens that weren't assigned to high-entropy center-based chunks, ensuring complete document coverage. Args: tokens: List of all token IDs in the document unassigned_positions: List of token positions that weren't assigned to chunks start_chunk_id: Initial chunk ID to use for the fallback chunks fallback_size: Size of each fallback chunk Returns: List of fallback chunk dictionaries """ fallback_chunks = [] # Group consecutive unassigned tokens groups = [] current_group = [] for pos in unassigned_positions: if not current_group or pos == current_group[-1] + 1: current_group.append(pos) else: groups.append(current_group) current_group = [pos] if current_group: groups.append(current_group) # Create chunks for each group for group in groups: for i in range(0, len(group), fallback_size): chunk_start = group[i] chunk_end = group[min(i + fallback_size - 1, len(group) - 1)] chunk_tokens = tokens[chunk_start:chunk_end + 1] fallback_chunks.append({ 'center_pos': (chunk_start + chunk_end) // 2, 'center_token': 'fallback', 'center_score': 0.0, 'start_pos': chunk_start, 'end_pos': chunk_end + 1, 'tokens': chunk_tokens, 'chunk_id': start_chunk_id + len(fallback_chunks), 'radius': (chunk_end - chunk_start + 1) // 2, 'adaptive_radius': False, 'fallback': True, 'local_density': 0.0 }) return fallback_chunks def process_text_file(self, file_path: str, entropy_percentile: int = 85, chunk_radius: int = 100, enable_adaptive: bool = True) -> None: """Process text file with improved entropy-based radial chunking. This method processes a text file by tokenizing it, calculating entropy scores for each token, finding high-entropy centers, and creating radial chunks around these centers. Args: file_path: Path to the text file to process entropy_percentile: Percentile for selecting high-entropy tokens chunk_radius: Base radius for chunks enable_adaptive: Whether to enable adaptive chunking """ self._log_message(f"Processing: {file_path}") self._log_message(f" Entropy percentile: {entropy_percentile}%, Base radius: {chunk_radius}, Adaptive: {enable_adaptive}") # Read file with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text = f.read() # Tokenize self._log_message("Tokenizing text...") tokens = self.tokenizer.encode(text, add_special_tokens=False) self._log_message(f" Total tokens: {len(tokens):,}") # Calculate entropy scores entropy_scores = self.calculate_token_entropy(tokens) # Find entropy centers centers, threshold = self.find_entropy_centers(entropy_scores, entropy_percentile) # Create radial chunks chunks = self.create_radial_chunks(tokens, centers, max_expansion=chunk_radius) # Clear existing data with self.get_connection() as conn: conn.execute("DELETE FROM documents") conn.execute("DELETE FROM entropy_centers") conn.commit() # Process and store chunks start_time = time.time() with self.get_connection() as conn: for chunk in chunks: # Decode chunk back to text chunk_text = self.tokenizer.decode(chunk['tokens']) # Generate embedding embedding = self.encode_text(chunk_text) # Store in database with enhanced metadata conn.execute(""" INSERT INTO documents (chunk_id, content, token_start, token_end, center_token, entropy_score, radius, adaptive_radius, local_density, fallback_chunk, embedding) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( chunk['chunk_id'], chunk_text, chunk['start_pos'], chunk['end_pos'], chunk['center_token'], chunk['center_score'], chunk.get('radius', 0), chunk.get('adaptive_radius', False), chunk.get('local_density', 0.0), chunk.get('fallback', False), embedding.tobytes() )) # Store entropy centers for center in centers: conn.execute(""" INSERT INTO entropy_centers (filename, token_position, token_text, entropy_score) VALUES (?, ?, ?, ?) """, (os.path.basename(file_path), center['position'], center['token'], center['score'])) # Store metadata avg_chunk_size = sum(len(c['tokens']) for c in chunks) / len(chunks) if chunks else 0 conn.execute(""" INSERT OR REPLACE INTO entropy_analysis (filename, total_tokens, total_chunks, entropy_threshold, high_entropy_tokens, avg_chunk_size) VALUES (?, ?, ?, ?, ?, ?) """, ( os.path.basename(file_path), len(tokens), len(chunks), threshold, len(centers), avg_chunk_size )) conn.commit() elapsed = time.time() - start_time self._log_message(f"Processing complete in {elapsed:.2f}s") if chunks: self._log_message(f" Avg time per chunk: {elapsed/len(chunks)*1000:.2f}ms") else: self._log_message(" Avg time per chunk: N/A") def encode_text(self, text: str) -> np.ndarray: """Encode text using mean pooling. This method converts text to embeddings by tokenizing it and then taking the mean of the token embeddings. Args: text: Input text to encode Returns: Normalized embedding vector """ tokens = self.tokenizer.encode(text, add_special_tokens=False) if not tokens: return np.zeros(self.embeddings.shape[1]) token_embeddings = [] for token_id in tokens: if token_id < len(self.embeddings): token_embeddings.append(self.embeddings[token_id]) if not token_embeddings: return np.zeros(self.embeddings.shape[1]) token_embeddings = np.array(token_embeddings, dtype=np.float32) final_embedding = np.mean(token_embeddings, axis=0) final_embedding = final_embedding / np.linalg.norm(final_embedding) return final_embedding def cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float: """Calculate cosine similarity between two vectors. Args: a: First vector b: Second vector Returns: Cosine similarity value between -1 and 1 """ return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) def search(self, query: str, limit: int = 10, min_similarity: float = 0.1) -> List[Dict[str, Any]]: """Search with entropy-aware results. This method performs a similarity search for the given query and returns the most relevant text chunks based on embedding similarity. Args: query: Query string to search for limit: Maximum number of results to return min_similarity: Minimum similarity threshold for results Returns: List of result dictionaries containing chunk information """ self._log_message(f"Searching for: '{query}'") # Encode query query_embedding = self.encode_text(query) # Search in database start_time = time.time() with self.get_connection() as conn: cursor = conn.execute(""" SELECT chunk_id, content, token_start, token_end, center_token, entropy_score, radius, adaptive_radius, local_density, fallback_chunk, embedding FROM documents ORDER BY chunk_id """) results = [] for row in cursor: # Decode embedding doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32) # Calculate similarity similarity = self.cosine_similarity(query_embedding, doc_embedding) if similarity >= min_similarity: results.append({ 'chunk_id': row['chunk_id'], 'content': row['content'][:200] + '...' if len(row['content']) > 200 else row['content'], 'full_content': row['content'], 'token_start': row['token_start'], 'token_end': row['token_end'], 'center_token': row['center_token'], 'entropy_score': row['entropy_score'], 'radius': row['radius'], 'adaptive_radius': bool(row['adaptive_radius']), 'local_density': row['local_density'], 'fallback': bool(row['fallback_chunk']), 'similarity': similarity }) # Sort by similarity results.sort(key=lambda x: x['similarity'], reverse=True) results = results[:limit] search_time = time.time() - start_time # Display results with enhanced chunk info self._log_message(f"Found {len(results)} results in {search_time:.3f}s:") self._log_message("-" * 80) for i, result in enumerate(results): chunk_type = "Fallback" if result['center_token'] == 'fallback' else "Radial" radius_info = f"radius: {result.get('radius', 'N/A')}" if 'radius' in result else "" density_info = f"density: {result.get('local_density', 0.0):.3f}" if 'local_density' in result else "" extra_info = [] if radius_info: extra_info.append(radius_info) if density_info: extra_info.append(density_info) if result.get('adaptive_radius', False): extra_info.append("adaptive") info_str = f" ({', '.join(extra_info)})" if extra_info else "" self._log_message(f"{i+1}. [{result['similarity']:.3f}] {chunk_type} Chunk {result['chunk_id']} (center: '{result['center_token']}', entropy: {result['entropy_score']:.2f}){info_str}") self._log_message(f" {result['content']}") self._log_message("") return results def get_entropy_stats(self) -> None: """Get entropy analysis statistics. This method retrieves and displays statistics about the entropy-based chunking process, including document counts, entropy centers, and processing information. """ with self.get_connection() as conn: # Document stats doc_stats = conn.execute("SELECT COUNT(*) as count FROM documents").fetchone() # Entropy analysis entropy_stats = conn.execute(""" SELECT filename, total_tokens, total_chunks, entropy_threshold, high_entropy_tokens, avg_chunk_size, processed_at FROM entropy_analysis ORDER BY processed_at DESC LIMIT 1 """).fetchone() # Top entropy centers top_centers = conn.execute(""" SELECT token_text, entropy_score, chunk_assigned FROM entropy_centers ORDER BY entropy_score DESC LIMIT 10 """).fetchall() self._log_message("Entropy-Based Search Statistics:") self._log_message(f" Total chunks: {doc_stats['count']:,}") if entropy_stats: self._log_message(f" File: {entropy_stats['filename']}") self._log_message(f" Total tokens: {entropy_stats['total_tokens']:,}") self._log_message(f" Entropy centers: {entropy_stats['high_entropy_tokens']:,}") self._log_message(f" Entropy threshold: {entropy_stats['entropy_threshold']:.3f}") self._log_message(f" Avg chunk size: {entropy_stats['avg_chunk_size']:.1f} tokens") self._log_message(f" Processed: {entropy_stats['processed_at']}") self._log_message(f"\nTop 10 High-Entropy Centers:") for i, center in enumerate(top_centers, 1): assigned = "āœ“" if center['chunk_assigned'] else "āœ—" self._log_message(f" {i:2d}. [{center['entropy_score']:.2f}] '{center['token_text']}' {assigned}") def main(): """Main function to demonstrate the EntropyRadialSearch functionality. This function initializes the search engine, processes a sample file if available, and performs test searches to demonstrate the capabilities of the entropy-based radial chunking system. """ print("Entropy-Based Radial Search Engine") print("=" * 50) # Initialize search engine search_engine = EntropyRadialSearch() # Check if sam.txt exists sam_file = "sam.txt" if os.path.exists(sam_file): print(f"Found {sam_file} ({os.path.getsize(sam_file)/1024/1024:.1f} MB)") # Process with improved entropy-based chunking (adaptive enabled) search_engine.process_text_file(sam_file, entropy_percentile=99, chunk_radius=100, enable_adaptive=True) # Show stats search_engine.get_entropy_stats() # Test with some queries test_queries = [ "Steven Lamoreaux Casimir experiment", "Jahn Teller effect dynamic", "quantum mechanics entanglement", "Casimir force measurement 1997" ] print(f"\nTesting entropy-based search:") print("-" * 50) for query in test_queries: search_engine.search(query, limit=3) print() print("Entropy-Based Search Engine Test Complete!") if __name__ == "__main__": main()