aninokumar's picture
Update rag.py
6a596d4 verified
#!/usr/bin/env python3
"""
Entropy-Based Radial Chunking Search Engine
This module implements an advanced search engine that uses high-entropy tokens
as semantic centers for intelligent text chunking and retrieval. The system
employs semantic binary search and adaptive chunking strategies for efficient
information retrieval.
The algorithm works by:
1. Calculating entropy for each token based on its embedding
2. Identifying high-entropy tokens as semantic centers
3. Creating chunks around these centers
4. Performing semantic search using binary search in the semantic space
Module: rag.py
Author: Aninokuma from Stealth Hut
Date: 2025
Version: 1.0.0
License: MIT
"""
import sqlite3
import numpy as np
from safetensors import safe_open
from transformers import AutoTokenizer
import time
import os
from collections import defaultdict, Counter
import math
from contextlib import contextmanager
from typing import List, Dict, Tuple, Optional, Any
class EntropyRadialSearch:
"""Search engine with entropy-based radial chunking
This class implements an advanced search system that uses high-entropy tokens
as semantic centers to create meaningful text chunks. The system provides
efficient retrieval through semantic binary search and adaptive chunking
strategies.
Attributes:
tokenizer: Pre-trained tokenizer for tokenizing text
embeddings: Token embeddings loaded from the model
db_path: Path to the SQLite database for storing chunks and metadata
"""
def __init__(self, model_dir: str = "qwen3_int8_harmonic", db_path: str = "entropy_radial_search.db"):
"""Initialize the EntropyRadialSearch instance.
Args:
model_dir: Directory containing the pre-trained model files
db_path: Path to the SQLite database file
"""
self._log_message("Loading Entropy-Based Radial Search Engine")
# Load model
self.tokenizer = AutoTokenizer.from_pretrained(model_dir)
tensors = {}
with safe_open(f"{model_dir}/model.safetensors", framework="numpy") as f:
for key in f.keys():
tensors[key] = f.get_tensor(key)
self.embeddings = tensors["embeddings"]
self.weights = tensors["weights"] # Load the weights tensor for rarity computation
self._log_message(f"Model loaded: {len(self.embeddings)} tokens, weights tensor: {len(self.weights)} values")
# Initialize database
self.db_path = db_path
self.init_database()
self._log_message(f"Database ready: {db_path}")
def _log_message(self, message: str) -> None:
"""Log a message to standard output.
Args:
message: The message to log
"""
print(f"[LOG] {message}")
@contextmanager
def get_connection(self):
"""SQLite connection context manager
Provides a database connection that is automatically closed after use.
Yields:
sqlite3.Connection: Database connection object
"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def init_database(self) -> None:
"""Initialize database with enhanced schema
Creates the necessary tables and indexes for storing document chunks,
entropy analysis data, and entropy centers. This method ensures all
required database structures exist before the search engine is used.
"""
try:
with self.get_connection() as conn:
# Documents table with enhanced chunk info
conn.execute("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chunk_id INTEGER,
content TEXT,
token_start INTEGER,
token_end INTEGER,
center_token TEXT,
entropy_score REAL,
radius INTEGER,
adaptive_radius BOOLEAN DEFAULT FALSE,
local_density REAL DEFAULT 0.0,
fallback_chunk BOOLEAN DEFAULT FALSE,
embedding BLOB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Entropy analysis table
conn.execute("""
CREATE TABLE IF NOT EXISTS entropy_analysis (
filename TEXT PRIMARY KEY,
total_tokens INTEGER,
total_chunks INTEGER,
entropy_threshold REAL,
high_entropy_tokens INTEGER,
avg_chunk_size REAL,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# High-entropy centers table
conn.execute("""
CREATE TABLE IF NOT EXISTS entropy_centers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT,
token_position INTEGER,
token_text TEXT,
entropy_score REAL,
chunk_assigned BOOLEAN DEFAULT FALSE
)
""")
# Indexes
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunk_id ON documents(chunk_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_entropy_score ON documents(entropy_score)")
conn.commit()
except sqlite3.Error as e:
self._log_message(f"Database initialization error: {e}")
raise
def calculate_token_entropy(self, tokens: List[int]) -> List[Tuple[int, str, float]]:
"""Calculate entropy for each token based on vector embedding entropy.
This method calculates entropy scores for tokens using both Shannon entropy
of normalized vector components and vector variance to identify
high-entropy tokens that serve as semantic centers.
Args:
tokens: List of token IDs to calculate entropy for
Returns:
List of tuples containing (position, token_text, entropy_score)
"""
self._log_message("Calculating vector entropy...")
entropy_scores: List[Tuple[int, str, float]] = []
# Calculate entropy for each token's embedding
for i, token_id in enumerate(tokens):
if token_id < len(self.embeddings):
vector = self.embeddings[token_id]
# Method 1: Shannon entropy of normalized vector components
normalized_vector = vector / np.linalg.norm(vector)
# Add small epsilon to avoid log(0)
epsilon = 1e-10
# Use absolute values to avoid negative log values
entropy = -np.sum(np.abs(normalized_vector) * np.log2(np.abs(normalized_vector) + epsilon))
# Method 2: Vector variance (higher variance = more entropy)
variance = np.var(vector)
variance_boost = 1.0 + variance * 0.1
# Incorporate weights for rarity - higher weights indicate rarer tokens
if token_id < len(self.weights):
# Weights are in range [0, 127], higher means rarer/more important
weight_value = self.weights[token_id].item()
# Normalize weight to create a rarity multiplier (e.g., 1.0 to 2.0 range)
rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 # Boost up to 2x
else:
rarity_multiplier = 1.0
# Combined entropy score with rarity adjustment
entropy_score = entropy * variance_boost * rarity_multiplier
# Get token text for reference
token_text = self.tokenizer.decode([token_id])
entropy_scores.append((i, token_text, entropy_score))
return entropy_scores
def find_entropy_centers(self, entropy_scores: List[Tuple[int, str, float]], percentile: int = 99) -> Tuple[List[Dict[str, Any]], float]:
"""Find high-entropy tokens to use as chunk centers.
This method identifies tokens with entropy scores above the specified
percentile to serve as semantic centers for chunking.
Args:
entropy_scores: List of (position, token_text, entropy_score) tuples
percentile: Percentile threshold for selecting high-entropy tokens
Returns:
Tuple containing list of center dictionaries and entropy threshold
"""
self._log_message(f"Finding entropy centers (> {percentile}th percentile)...")
# Calculate percentile threshold
scores = [score for _, _, score in entropy_scores]
if not scores:
return [], 0.0
threshold = np.percentile(scores, percentile)
self._log_message(f" Entropy threshold: {threshold:.3f}")
# Find high-entropy tokens
centers = []
for i, token, score in entropy_scores:
if score >= threshold:
centers.append({
'position': i,
'token': token,
'score': score
})
# Sort by entropy score (highest first)
centers.sort(key=lambda x: x['score'], reverse=True)
self._log_message(f" Found {len(centers)} entropy centers")
return centers, threshold
def calculate_adaptive_radius(self, center_score: float, avg_entropy: float, local_entropy_density: float, base_radius: int = 100) -> int:
"""Calculate adaptive radius based on entropy and local density.
This method calculates a dynamic radius for chunking based on the center
token's entropy score and the local entropy density.
Args:
center_score: Entropy score of the center token
avg_entropy: Average entropy across all tokens
local_entropy_density: Density of high-entropy tokens in the local area
base_radius: Base radius to use as reference
Returns:
Integer radius value bounded between 50 and 200 tokens
"""
# Higher entropy centers get larger radius
entropy_factor = center_score / avg_entropy if avg_entropy > 0 else 1.0
# Lower density areas get larger radius (sparse regions need more coverage)
density_factor = 1.0 / (1.0 + local_entropy_density)
# Adaptive radius with bounds
adaptive_radius = int(base_radius * entropy_factor * density_factor)
return max(50, min(200, adaptive_radius)) # Bound between 50-200 tokens
def calculate_local_entropy_density(self, centers: List[Dict[str, Any]], position: int, window: int = 50) -> float:
"""Calculate entropy density around a position.
This method calculates the density of high-entropy tokens within a
specified window around a given position.
Args:
centers: List of high-entropy token centers
position: Position to calculate density around
window: Size of the window for density calculation
Returns:
Float representing the entropy density
"""
nearby_centers = [c for c in centers if abs(c['position'] - position) <= window]
if not nearby_centers:
return 0.0
return len(nearby_centers) / (2 * window + 1)
def is_semantic_boundary(self, token_id: int, tokens: List[int], pos: int) -> bool:
"""Check if token position represents a language-agnostic semantic boundary.
This method determines if a token position represents a semantic boundary
by analyzing embedding similarities with neighboring tokens.
Args:
token_id: ID of the current token
tokens: List of all token IDs
pos: Position of the current token in the token list
Returns:
Boolean indicating whether this position is a semantic boundary
"""
if pos < 0 or pos >= len(tokens):
return True # Document boundaries are always boundaries
# Get current token embedding
if token_id >= len(self.embeddings):
return True
current_vector = self.embeddings[token_id]
# Calculate semantic similarity with neighbors
boundary_scores = []
# Check left semantic discontinuity
if pos > 0:
left_token_id = tokens[pos - 1]
if left_token_id < len(self.embeddings):
left_vector = self.embeddings[left_token_id]
# Cosine similarity (lower = more boundary-like)
similarity = np.dot(current_vector, left_vector) / (
np.linalg.norm(current_vector) * np.linalg.norm(left_vector)
)
boundary_scores.append(1.0 - similarity) # Convert to boundary score
# Check right semantic discontinuity
if pos < len(tokens) - 1:
right_token_id = tokens[pos + 1]
if right_token_id < len(self.embeddings):
right_vector = self.embeddings[right_token_id]
similarity = np.dot(current_vector, right_vector) / (
np.linalg.norm(current_vector) * np.linalg.norm(right_vector)
)
boundary_scores.append(1.0 - similarity) # Convert to boundary score
# Check larger context (2 tokens away) for broader boundaries
if pos > 1:
left2_token_id = tokens[pos - 2]
if left2_token_id < len(self.embeddings):
left2_vector = self.embeddings[left2_token_id]
similarity = np.dot(current_vector, left2_vector) / (
np.linalg.norm(current_vector) * np.linalg.norm(left2_vector)
)
boundary_scores.append(1.0 - similarity)
if pos < len(tokens) - 2:
right2_token_id = tokens[pos + 2]
if right2_token_id < len(self.embeddings):
right2_vector = self.embeddings[right2_token_id]
similarity = np.dot(current_vector, right2_vector) / (
np.linalg.norm(current_vector) * np.linalg.norm(right2_vector)
)
boundary_scores.append(1.0 - similarity)
# Average boundary score
if boundary_scores:
avg_boundary_score = sum(boundary_scores) / len(boundary_scores)
# Check if vector entropy is also high (indicating transition)
if token_id < len(self.embeddings):
vector = self.embeddings[token_id]
normalized_vector = vector / np.linalg.norm(vector)
epsilon = 1e-10
# Use absolute values to avoid negative log values
entropy = -np.sum(np.abs(normalized_vector) * np.log2(np.abs(normalized_vector) + epsilon))
# Incorporate weights for rarity - higher weights indicate rarer tokens
if token_id < len(self.weights):
weight_value = self.weights[token_id].item()
rarity_multiplier = 1.0 + (weight_value / 127.0) * 0.5 # Moderate boost for boundaries
else:
rarity_multiplier = 1.0
entropy = entropy * rarity_multiplier
# Combine semantic discontinuity with entropy
combined_score = avg_boundary_score * (1.0 + entropy * 0.001)
# Threshold determined empirically - works across languages
return combined_score > 0.4
return False
def expand_to_boundaries(self, tokens: List[int], center_pos: int, max_expansion: int = 500) -> Tuple[int, int]:
"""Naturally expand from center until hitting natural boundaries.
This method expands from a center position in both directions until
semantic boundaries are encountered.
Args:
tokens: List of token IDs
center_pos: Starting position for expansion
max_expansion: Maximum number of tokens to expand in each direction
Returns:
Tuple containing start and end positions for the chunk
"""
start_pos = center_pos
end_pos = center_pos
# Expand left until boundary
left_expansion = 0
while start_pos > 0 and left_expansion < max_expansion:
prev_pos = start_pos - 1
if self.is_semantic_boundary(tokens[prev_pos], tokens, prev_pos):
break
start_pos = prev_pos
left_expansion += 1
# Expand right until boundary
right_expansion = 0
while end_pos < len(tokens) - 1 and right_expansion < max_expansion:
next_pos = end_pos + 1
if self.is_semantic_boundary(tokens[next_pos], tokens, next_pos):
break
end_pos = next_pos
right_expansion += 1
return start_pos, end_pos + 1 # +1 for inclusive range
def create_radial_chunks(self, tokens: List[int], centers: List[Dict[str, Any]], max_expansion: int = 500) -> List[Dict[str, Any]]:
"""Slice corpus at midpoints between 99th percentile high-entropy tokens.
This method creates chunks by slicing the text at midpoints between
high-entropy token centers.
Args:
tokens: List of all token IDs in the document
centers: List of high-entropy token centers
max_expansion: Maximum expansion for chunk boundaries (not used in this method)
Returns:
List of chunk dictionaries with position and content information
"""
self._log_message(f"SLICING corpus at midpoints between {len(centers)} high-entropy centers...")
if not centers:
return []
# Sort centers by position
sorted_centers = sorted(centers, key=lambda x: x['position'])
chunks = []
# First chunk: start to first midpoint
if len(sorted_centers) > 1:
first_center = sorted_centers[0]
second_center = sorted_centers[1]
midpoint = (first_center['position'] + second_center['position']) // 2
start_pos = 0
end_pos = midpoint + 1
chunk_tokens = tokens[start_pos:end_pos]
chunks.append({
'center_pos': first_center['position'],
'center_token': first_center['token'],
'center_score': first_center['score'],
'start_pos': start_pos,
'end_pos': end_pos,
'tokens': chunk_tokens,
'chunk_id': len(chunks)
})
# Middle chunks: between adjacent midpoints
for i in range(1, len(sorted_centers) - 1):
current_center = sorted_centers[i]
prev_center = sorted_centers[i-1]
next_center = sorted_centers[i+1]
left_midpoint = (prev_center['position'] + current_center['position']) // 2
right_midpoint = (current_center['position'] + next_center['position']) // 2
start_pos = left_midpoint + 1
end_pos = right_midpoint + 1
chunk_tokens = tokens[start_pos:end_pos]
chunks.append({
'center_pos': current_center['position'],
'center_token': current_center['token'],
'center_score': current_center['score'],
'start_pos': start_pos,
'end_pos': end_pos,
'tokens': chunk_tokens,
'chunk_id': len(chunks)
})
# Last chunk: last midpoint to end
if len(sorted_centers) > 1:
last_center = sorted_centers[-1]
prev_center = sorted_centers[-2]
midpoint = (prev_center['position'] + last_center['position']) // 2
start_pos = midpoint + 1
end_pos = len(tokens)
chunk_tokens = tokens[start_pos:end_pos]
chunks.append({
'center_pos': last_center['position'],
'center_token': last_center['token'],
'center_score': last_center['score'],
'start_pos': start_pos,
'end_pos': end_pos,
'tokens': chunk_tokens,
'chunk_id': len(chunks)
})
# If only one center, whole document is one chunk
elif len(sorted_centers) == 1:
center = sorted_centers[0]
chunks.append({
'center_pos': center['position'],
'center_token': center['token'],
'center_score': center['score'],
'start_pos': 0,
'end_pos': len(tokens),
'tokens': tokens,
'chunk_id': len(chunks)
})
self._log_message(f" Sliced corpus into {len(chunks)} chunks at high-entropy midpoints")
self._log_message(f" Perfect coverage: 100% (corpus fully partitioned)")
self._log_message(f" Avg chunk size: {len(tokens)/len(chunks):.1f} tokens")
return chunks
def semantic_binary_search(self, query: str, max_depth: int = 5) -> List[Dict[str, Any]]:
"""Navigate corpus using semantic binary search on 99th percentile entropy tokens.
This method performs a binary search in semantic space by analyzing
high-entropy tokens in chunks and navigating toward the most relevant
content based on query similarity.
Args:
query: Query string to search for
max_depth: Maximum depth for semantic search navigation
Returns:
List of context chunks around the stopping point
"""
self._log_message(f"SEMANTIC BINARY SEARCH: '{query}'")
self._log_message("=" * 60)
# Encode query
query_embedding = self.encode_text(query)
# Get all chunks with their centers
with self.get_connection() as conn:
cursor = conn.execute("""
SELECT chunk_id, center_token, entropy_score, token_start, token_end, content, embedding
FROM documents
ORDER BY chunk_id
""")
chunks = []
for row in cursor:
doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32)
similarity = self.cosine_similarity(query_embedding, doc_embedding)
chunks.append({
'chunk_id': row['chunk_id'],
'center_token': row['center_token'],
'center_score': row['entropy_score'],
'token_start': row['token_start'],
'token_end': row['token_end'],
'content': row['content'],
'similarity': similarity
})
# Sort by similarity to find best starting point
chunks.sort(key=lambda x: x['similarity'], reverse=True)
if not chunks:
self._log_message("No chunks found for semantic search")
return []
# Start with best chunk
current_chunk = chunks[0]
path = [current_chunk]
self._log_message(f"Starting at Chunk {current_chunk['chunk_id']} (similarity: {current_chunk['similarity']:.3f})")
self._log_message(f" Center: '{current_chunk['center_token']}' (entropy: {current_chunk['center_score']:.2f})")
for depth in range(max_depth):
self._log_message(f"\nDepth {depth + 1}: Analyzing Chunk {current_chunk['chunk_id']}")
# Get tokens for this chunk (excluding center)
chunk_tokens = []
with self.get_connection() as conn:
cursor = conn.execute("""
SELECT content, token_start, token_end FROM documents WHERE chunk_id = ?
""", (current_chunk['chunk_id'],))
row = cursor.fetchone()
if row:
# Reconstruct chunk tokens from stored content
chunk_text = row['content']
chunk_tokens = self.tokenizer.encode(chunk_text, add_special_tokens=False)
# Update current_chunk with token positions
current_chunk['token_start'] = row['token_start']
current_chunk['token_end'] = row['token_end']
if len(chunk_tokens) < 3:
self._log_message(f" Chunk too small for binary navigation")
break
# Find center position in chunk tokens
center_token_id = self.tokenizer.encode(current_chunk['center_token'], add_special_tokens=False)[0] if self.tokenizer.encode(current_chunk['center_token'], add_special_tokens=False) else None
if center_token_id is None:
self._log_message(f" Cannot find center token in chunk")
break
try:
center_pos = chunk_tokens.index(center_token_id)
except ValueError:
center_pos = len(chunk_tokens) // 2 # Fallback to middle
self._log_message(f" Center token not found, using middle position {center_pos}")
# Split into left and right sections
left_tokens = chunk_tokens[:center_pos]
right_tokens = chunk_tokens[center_pos + 1:]
self._log_message(f" Left section: {len(left_tokens)} tokens")
self._log_message(f" Right section: {len(right_tokens)} tokens")
# Find 99th percentile high-entropy tokens in each section
left_entropy_scores = []
for i, token_id in enumerate(left_tokens):
if token_id < len(self.embeddings):
vector = self.embeddings[token_id]
normalized_vector = vector / np.linalg.norm(vector)
epsilon = 1e-10
entropy = -np.sum(normalized_vector * np.log2(np.abs(normalized_vector) + epsilon))
variance = np.var(vector)
# Incorporate weights for rarity - higher weights indicate rarer tokens
if token_id < len(self.weights):
weight_value = self.weights[token_id].item()
rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 # Boost up to 2x
else:
rarity_multiplier = 1.0
entropy_score = entropy * (1.0 + variance * 0.1) * rarity_multiplier
left_entropy_scores.append((i, token_id, entropy_score))
right_entropy_scores = []
for i, token_id in enumerate(right_tokens):
if token_id < len(self.embeddings):
vector = self.embeddings[token_id]
normalized_vector = vector / np.linalg.norm(vector)
epsilon = 1e-10
entropy = -np.sum(normalized_vector * np.log2(np.abs(normalized_vector) + epsilon))
variance = np.var(vector)
# Incorporate weights for rarity - higher weights indicate rarer tokens
if token_id < len(self.weights):
weight_value = self.weights[token_id].item()
rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 # Boost up to 2x
else:
rarity_multiplier = 1.0
entropy_score = entropy * (1.0 + variance * 0.1) * rarity_multiplier
right_entropy_scores.append((i, token_id, entropy_score))
# Get 99th percentile threshold for each section
if left_entropy_scores:
left_scores = [score for _, _, score in left_entropy_scores]
left_threshold = np.percentile(left_scores, 99) if len(left_scores) > 0 else 0
left_high_entropy = [(pos, token_id, score) for pos, token_id, score in left_entropy_scores if score >= left_threshold]
else:
left_high_entropy = []
left_threshold = 0
if right_entropy_scores:
right_scores = [score for _, _, score in right_entropy_scores]
right_threshold = np.percentile(right_scores, 99) if len(right_scores) > 0 else 0
right_high_entropy = [(pos, token_id, score) for pos, token_id, score in right_entropy_scores if score >= right_threshold]
else:
right_high_entropy = []
right_threshold = 0
self._log_message(f" Left 99th percentile threshold: {left_threshold:.2f} ({len(left_high_entropy)} tokens)")
self._log_message(f" Right 99th percentile threshold: {right_threshold:.2f} ({len(right_high_entropy)} tokens)")
if not left_high_entropy and not right_high_entropy:
self._log_message(f" No high-entropy tokens found - search complete")
break
# Calculate query similarity to high-entropy tokens
left_similarity_sum = 0
right_similarity_sum = 0
for pos, token_id, score in left_high_entropy:
if token_id < len(self.embeddings):
token_embedding = self.embeddings[token_id]
similarity = self.cosine_similarity(query_embedding, token_embedding)
left_similarity_sum += similarity * score
for pos, token_id, score in right_high_entropy:
if token_id < len(self.embeddings):
token_embedding = self.embeddings[token_id]
similarity = self.cosine_similarity(query_embedding, token_embedding)
right_similarity_sum += similarity * score
self._log_message(f" Left query similarity: {left_similarity_sum:.3f}")
self._log_message(f" Right query similarity: {right_similarity_sum:.3f}")
# Decide direction
if left_similarity_sum > right_similarity_sum:
direction = "LEFT"
self._log_message(f" Query more similar to LEFT section")
elif right_similarity_sum > left_similarity_sum:
direction = "RIGHT"
self._log_message(f" Query more similar to RIGHT section")
else:
direction = "EQUAL"
self._log_message(f" Equal similarity - search complete")
break
# Find adjacent chunk in that direction
with self.get_connection() as conn:
if direction == "LEFT":
cursor = conn.execute("""
SELECT chunk_id, center_token, entropy_score, content, embedding
FROM documents
WHERE token_end < ?
ORDER BY token_end DESC
LIMIT 1
""", (current_chunk['token_start'],))
else: # RIGHT
cursor = conn.execute("""
SELECT chunk_id, center_token, entropy_score, content, embedding
FROM documents
WHERE token_start > ?
ORDER BY token_start ASC
LIMIT 1
""", (current_chunk['token_end'],))
next_row = cursor.fetchone()
if next_row:
next_embedding = np.frombuffer(next_row['embedding'], dtype=np.float32)
next_similarity = self.cosine_similarity(query_embedding, next_embedding)
current_chunk = {
'chunk_id': next_row['chunk_id'],
'center_token': next_row['center_token'],
'center_score': next_row['entropy_score'],
'content': next_row['content'],
'similarity': next_similarity
}
path.append(current_chunk)
self._log_message(f" Moved {direction} to Chunk {current_chunk['chunk_id']} (similarity: {current_chunk['similarity']:.3f})")
self._log_message(f" Center: '{current_chunk['center_token']}' (entropy: {current_chunk['center_score']:.2f})")
self._log_message(f" Content: \"{current_chunk['content'][:100]}...\"")
else:
self._log_message(f" No more chunks in {direction} direction")
break
self._log_message(f"\nSEMANTIC SEARCH PATH:")
self._log_message("-" * 40)
for i, chunk in enumerate(path):
self._log_message(f"Step {i}: Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})")
self._log_message(f" Center: '{chunk['center_token']}' - \"{chunk['content'][:80]}...\"")
# Return context window around stopping point (~512 target tokens)
return self.get_context_window(current_chunk, query_embedding, target_tokens=512)
def get_context_window(self, center_chunk, query_embedding, target_tokens=512):
"""Get surrounding chunks around the stopping point until reaching target token count"""
print(f"\n🔍 GETTING CONTEXT WINDOW: ~{target_tokens} tokens around Chunk {center_chunk['chunk_id']}")
self._log_message("=" * 60)
with self.get_connection() as conn:
# Get all chunks ordered by chunk_id for token-based expansion
cursor = conn.execute("""
SELECT chunk_id, center_token, entropy_score, token_start, token_end, content, embedding
FROM documents
ORDER BY chunk_id
""")
all_chunks = []
for row in cursor:
doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32)
similarity = self.cosine_similarity(query_embedding, doc_embedding)
tokens_in_chunk = row['token_end'] - row['token_start']
all_chunks.append({
'chunk_id': row['chunk_id'],
'center_token': row['center_token'],
'center_score': row['entropy_score'],
'token_start': row['token_start'],
'token_end': row['token_end'],
'content': row['content'],
'similarity': similarity,
'tokens_count': tokens_in_chunk,
'direction': 'CENTER' if row['chunk_id'] == center_chunk['chunk_id'] else None
})
# Find the center chunk position in the ordered list
center_idx = None
for i, chunk in enumerate(all_chunks):
if chunk['chunk_id'] == center_chunk['chunk_id']:
center_idx = i
chunk['direction'] = 'CENTER' # Mark as center
break
if center_idx is None:
self._log_message("Center chunk not found in all chunks list")
return []
# Expand outwards from the center chunk until we reach target token count
context_chunks = [all_chunks[center_idx]]
total_tokens = all_chunks[center_idx]['tokens_count']
left_idx = center_idx - 1
right_idx = center_idx + 1
while total_tokens < target_tokens:
# Determine which direction to expand (whichever has more available chunks or smaller addition)
left_available = left_idx >= 0
right_available = right_idx < len(all_chunks)
if not left_available and not right_available:
break # No more chunks to add
# If only one side is available, take from that side
if left_available and not right_available:
next_chunk = all_chunks[left_idx]
left_idx -= 1
elif right_available and not left_available:
next_chunk = all_chunks[right_idx]
right_idx += 1
else:
# Both sides available, take the one with fewer tokens to balance context
left_chunk = all_chunks[left_idx]
right_chunk = all_chunks[right_idx]
if left_chunk['tokens_count'] <= right_chunk['tokens_count']:
next_chunk = left_chunk
left_idx -= 1
else:
next_chunk = right_chunk
right_idx += 1
# Add direction indicator
direction = 'BEFORE' if next_chunk['chunk_id'] < center_chunk['chunk_id'] else 'AFTER'
next_chunk['direction'] = direction
context_chunks.append(next_chunk)
total_tokens += next_chunk['tokens_count']
# Sort by similarity for display and return
context_chunks_sorted = sorted(context_chunks, key=lambda x: x['similarity'], reverse=True)
self._log_message(f"CONTEXT WINDOW RESULTS:")
self._log_message(f" Total chunks: {len(context_chunks)}")
self._log_message(f" Total tokens: {total_tokens}")
self._log_message(f" Range: Chunk {min(c['chunk_id'] for c in context_chunks)} to Chunk {max(c['chunk_id'] for c in context_chunks)}")
self._log_message(f" Center: Chunk {center_chunk['chunk_id']} ('{center_chunk['center_token']}')")
self._log_message(f"\nTOP 10 MOST SIMILAR CHUNKS IN CONTEXT:")
self._log_message("-" * 50)
for i, chunk in enumerate(context_chunks_sorted[:10], 1):
self._log_message(f"{i}. Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})")
self._log_message(f" Direction: {chunk['direction']}, Tokens: {chunk['tokens_count']}")
self._log_message(f" Center: '{chunk['center_token']}' (entropy: {chunk['center_score']:.2f})")
self._log_message(f" Content: \"{chunk['content'][:100]}...\"")
self._log_message("")
# Look for specific keywords in the context
keywords = ['radar', 'cavity', 'microwave', 'waveguide', 'standing', 'waves', 'zero-point', 'energy']
keyword_matches = []
for chunk in context_chunks:
content_lower = chunk['content'].lower()
for keyword in keywords:
if keyword in content_lower:
if chunk not in keyword_matches:
keyword_matches.append(chunk)
break
if keyword_matches:
self._log_message(f"\nKEYWORD MATCHES FOUND:")
self._log_message("-" * 40)
for i, chunk in enumerate(keyword_matches[:5], 1):
self._log_message(f"{i}. Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})")
# Show the keyword match
content_lower = chunk['content'].lower()
matched_keywords = [kw for kw in keywords if kw in content_lower]
self._log_message(f" Keywords: {', '.join(matched_keywords)}")
self._log_message(f" Content: \"{chunk['content'][:150]}...\"")
self._log_message("")
self._log_message(f"CONTEXT WINDOW SEARCH COMPLETE")
self._log_message(f" Analyzed {len(context_chunks)} chunks (~{total_tokens} tokens) around the semantic search stopping point")
return context_chunks_sorted
def create_fallback_chunks(self, tokens: List[int], unassigned_positions: List[int], start_chunk_id: int, fallback_size: int = 64) -> List[Dict[str, Any]]:
"""Create chunks for unassigned tokens to ensure complete coverage.
This method creates fallback chunks for any tokens that weren't assigned
to high-entropy center-based chunks, ensuring complete document coverage.
Args:
tokens: List of all token IDs in the document
unassigned_positions: List of token positions that weren't assigned to chunks
start_chunk_id: Initial chunk ID to use for the fallback chunks
fallback_size: Size of each fallback chunk
Returns:
List of fallback chunk dictionaries
"""
fallback_chunks = []
# Group consecutive unassigned tokens
groups = []
current_group = []
for pos in unassigned_positions:
if not current_group or pos == current_group[-1] + 1:
current_group.append(pos)
else:
groups.append(current_group)
current_group = [pos]
if current_group:
groups.append(current_group)
# Create chunks for each group
for group in groups:
for i in range(0, len(group), fallback_size):
chunk_start = group[i]
chunk_end = group[min(i + fallback_size - 1, len(group) - 1)]
chunk_tokens = tokens[chunk_start:chunk_end + 1]
fallback_chunks.append({
'center_pos': (chunk_start + chunk_end) // 2,
'center_token': 'fallback',
'center_score': 0.0,
'start_pos': chunk_start,
'end_pos': chunk_end + 1,
'tokens': chunk_tokens,
'chunk_id': start_chunk_id + len(fallback_chunks),
'radius': (chunk_end - chunk_start + 1) // 2,
'adaptive_radius': False,
'fallback': True,
'local_density': 0.0
})
return fallback_chunks
def process_text_file(self, file_path: str, entropy_percentile: int = 85, chunk_radius: int = 100, enable_adaptive: bool = True) -> None:
"""Process text file with improved entropy-based radial chunking.
This method processes a text file by tokenizing it, calculating entropy
scores for each token, finding high-entropy centers, and creating
radial chunks around these centers.
Args:
file_path: Path to the text file to process
entropy_percentile: Percentile for selecting high-entropy tokens
chunk_radius: Base radius for chunks
enable_adaptive: Whether to enable adaptive chunking
"""
self._log_message(f"Processing: {file_path}")
self._log_message(f" Entropy percentile: {entropy_percentile}%, Base radius: {chunk_radius}, Adaptive: {enable_adaptive}")
# Read file
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
# Tokenize
self._log_message("Tokenizing text...")
tokens = self.tokenizer.encode(text, add_special_tokens=False)
self._log_message(f" Total tokens: {len(tokens):,}")
# Calculate entropy scores
entropy_scores = self.calculate_token_entropy(tokens)
# Find entropy centers
centers, threshold = self.find_entropy_centers(entropy_scores, entropy_percentile)
# Create radial chunks
chunks = self.create_radial_chunks(tokens, centers, max_expansion=chunk_radius)
# Clear existing data
with self.get_connection() as conn:
conn.execute("DELETE FROM documents")
conn.execute("DELETE FROM entropy_centers")
conn.commit()
# Process and store chunks
start_time = time.time()
with self.get_connection() as conn:
for chunk in chunks:
# Decode chunk back to text
chunk_text = self.tokenizer.decode(chunk['tokens'])
# Generate embedding
embedding = self.encode_text(chunk_text)
# Store in database with enhanced metadata
conn.execute("""
INSERT INTO documents
(chunk_id, content, token_start, token_end, center_token, entropy_score,
radius, adaptive_radius, local_density, fallback_chunk, embedding)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
chunk['chunk_id'],
chunk_text,
chunk['start_pos'],
chunk['end_pos'],
chunk['center_token'],
chunk['center_score'],
chunk.get('radius', 0),
chunk.get('adaptive_radius', False),
chunk.get('local_density', 0.0),
chunk.get('fallback', False),
embedding.tobytes()
))
# Store entropy centers
for center in centers:
conn.execute("""
INSERT INTO entropy_centers
(filename, token_position, token_text, entropy_score)
VALUES (?, ?, ?, ?)
""", (os.path.basename(file_path), center['position'], center['token'], center['score']))
# Store metadata
avg_chunk_size = sum(len(c['tokens']) for c in chunks) / len(chunks) if chunks else 0
conn.execute("""
INSERT OR REPLACE INTO entropy_analysis
(filename, total_tokens, total_chunks, entropy_threshold, high_entropy_tokens, avg_chunk_size)
VALUES (?, ?, ?, ?, ?, ?)
""", (
os.path.basename(file_path),
len(tokens),
len(chunks),
threshold,
len(centers),
avg_chunk_size
))
conn.commit()
elapsed = time.time() - start_time
self._log_message(f"Processing complete in {elapsed:.2f}s")
if chunks:
self._log_message(f" Avg time per chunk: {elapsed/len(chunks)*1000:.2f}ms")
else:
self._log_message(" Avg time per chunk: N/A")
def encode_text(self, text: str) -> np.ndarray:
"""Encode text using mean pooling.
This method converts text to embeddings by tokenizing it and then
taking the mean of the token embeddings.
Args:
text: Input text to encode
Returns:
Normalized embedding vector
"""
tokens = self.tokenizer.encode(text, add_special_tokens=False)
if not tokens:
return np.zeros(self.embeddings.shape[1])
token_embeddings = []
for token_id in tokens:
if token_id < len(self.embeddings):
token_embeddings.append(self.embeddings[token_id])
if not token_embeddings:
return np.zeros(self.embeddings.shape[1])
token_embeddings = np.array(token_embeddings, dtype=np.float32)
final_embedding = np.mean(token_embeddings, axis=0)
final_embedding = final_embedding / np.linalg.norm(final_embedding)
return final_embedding
def cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculate cosine similarity between two vectors.
Args:
a: First vector
b: Second vector
Returns:
Cosine similarity value between -1 and 1
"""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def search(self, query: str, limit: int = 10, min_similarity: float = 0.1) -> List[Dict[str, Any]]:
"""Search with entropy-aware results.
This method performs a similarity search for the given query and
returns the most relevant text chunks based on embedding similarity.
Args:
query: Query string to search for
limit: Maximum number of results to return
min_similarity: Minimum similarity threshold for results
Returns:
List of result dictionaries containing chunk information
"""
self._log_message(f"Searching for: '{query}'")
# Encode query
query_embedding = self.encode_text(query)
# Search in database
start_time = time.time()
with self.get_connection() as conn:
cursor = conn.execute("""
SELECT chunk_id, content, token_start, token_end, center_token, entropy_score,
radius, adaptive_radius, local_density, fallback_chunk, embedding
FROM documents
ORDER BY chunk_id
""")
results = []
for row in cursor:
# Decode embedding
doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32)
# Calculate similarity
similarity = self.cosine_similarity(query_embedding, doc_embedding)
if similarity >= min_similarity:
results.append({
'chunk_id': row['chunk_id'],
'content': row['content'][:200] + '...' if len(row['content']) > 200 else row['content'],
'full_content': row['content'],
'token_start': row['token_start'],
'token_end': row['token_end'],
'center_token': row['center_token'],
'entropy_score': row['entropy_score'],
'radius': row['radius'],
'adaptive_radius': bool(row['adaptive_radius']),
'local_density': row['local_density'],
'fallback': bool(row['fallback_chunk']),
'similarity': similarity
})
# Sort by similarity
results.sort(key=lambda x: x['similarity'], reverse=True)
results = results[:limit]
search_time = time.time() - start_time
# Display results with enhanced chunk info
self._log_message(f"Found {len(results)} results in {search_time:.3f}s:")
self._log_message("-" * 80)
for i, result in enumerate(results):
chunk_type = "Fallback" if result['center_token'] == 'fallback' else "Radial"
radius_info = f"radius: {result.get('radius', 'N/A')}" if 'radius' in result else ""
density_info = f"density: {result.get('local_density', 0.0):.3f}" if 'local_density' in result else ""
extra_info = []
if radius_info:
extra_info.append(radius_info)
if density_info:
extra_info.append(density_info)
if result.get('adaptive_radius', False):
extra_info.append("adaptive")
info_str = f" ({', '.join(extra_info)})" if extra_info else ""
self._log_message(f"{i+1}. [{result['similarity']:.3f}] {chunk_type} Chunk {result['chunk_id']} (center: '{result['center_token']}', entropy: {result['entropy_score']:.2f}){info_str}")
self._log_message(f" {result['content']}")
self._log_message("")
return results
def get_entropy_stats(self) -> None:
"""Get entropy analysis statistics.
This method retrieves and displays statistics about the entropy-based
chunking process, including document counts, entropy centers, and
processing information.
"""
with self.get_connection() as conn:
# Document stats
doc_stats = conn.execute("SELECT COUNT(*) as count FROM documents").fetchone()
# Entropy analysis
entropy_stats = conn.execute("""
SELECT filename, total_tokens, total_chunks, entropy_threshold,
high_entropy_tokens, avg_chunk_size, processed_at
FROM entropy_analysis
ORDER BY processed_at DESC
LIMIT 1
""").fetchone()
# Top entropy centers
top_centers = conn.execute("""
SELECT token_text, entropy_score, chunk_assigned
FROM entropy_centers
ORDER BY entropy_score DESC
LIMIT 10
""").fetchall()
self._log_message("Entropy-Based Search Statistics:")
self._log_message(f" Total chunks: {doc_stats['count']:,}")
if entropy_stats:
self._log_message(f" File: {entropy_stats['filename']}")
self._log_message(f" Total tokens: {entropy_stats['total_tokens']:,}")
self._log_message(f" Entropy centers: {entropy_stats['high_entropy_tokens']:,}")
self._log_message(f" Entropy threshold: {entropy_stats['entropy_threshold']:.3f}")
self._log_message(f" Avg chunk size: {entropy_stats['avg_chunk_size']:.1f} tokens")
self._log_message(f" Processed: {entropy_stats['processed_at']}")
self._log_message(f"\nTop 10 High-Entropy Centers:")
for i, center in enumerate(top_centers, 1):
assigned = "✓" if center['chunk_assigned'] else "✗"
self._log_message(f" {i:2d}. [{center['entropy_score']:.2f}] '{center['token_text']}' {assigned}")
def main():
"""Main function to demonstrate the EntropyRadialSearch functionality.
This function initializes the search engine, processes a sample file if
available, and performs test searches to demonstrate the capabilities
of the entropy-based radial chunking system.
"""
print("Entropy-Based Radial Search Engine")
print("=" * 50)
# Initialize search engine
search_engine = EntropyRadialSearch()
# Check if sam.txt exists
sam_file = "sam.txt"
if os.path.exists(sam_file):
print(f"Found {sam_file} ({os.path.getsize(sam_file)/1024/1024:.1f} MB)")
# Process with improved entropy-based chunking (adaptive enabled)
search_engine.process_text_file(sam_file, entropy_percentile=99, chunk_radius=100, enable_adaptive=True)
# Show stats
search_engine.get_entropy_stats()
# Test with some queries
test_queries = [
"Steven Lamoreaux Casimir experiment",
"Jahn Teller effect dynamic",
"quantum mechanics entanglement",
"Casimir force measurement 1997"
]
print(f"\nTesting entropy-based search:")
print("-" * 50)
for query in test_queries:
search_engine.search(query, limit=3)
print()
print("Entropy-Based Search Engine Test Complete!")
if __name__ == "__main__":
main()