|
|
|
|
|
""" |
|
|
Entropy-Based Radial Chunking Search Engine |
|
|
|
|
|
This module implements an advanced search engine that uses high-entropy tokens |
|
|
as semantic centers for intelligent text chunking and retrieval. The system |
|
|
employs semantic binary search and adaptive chunking strategies for efficient |
|
|
information retrieval. |
|
|
|
|
|
The algorithm works by: |
|
|
1. Calculating entropy for each token based on its embedding |
|
|
2. Identifying high-entropy tokens as semantic centers |
|
|
3. Creating chunks around these centers |
|
|
4. Performing semantic search using binary search in the semantic space |
|
|
|
|
|
Module: rag.py |
|
|
Author: Aninokuma from Stealth Hut |
|
|
Date: 2025 |
|
|
Version: 1.0.0 |
|
|
License: MIT |
|
|
""" |
|
|
|
|
|
import sqlite3 |
|
|
import numpy as np |
|
|
from safetensors import safe_open |
|
|
from transformers import AutoTokenizer |
|
|
import time |
|
|
import os |
|
|
from collections import defaultdict, Counter |
|
|
import math |
|
|
from contextlib import contextmanager |
|
|
from typing import List, Dict, Tuple, Optional, Any |
|
|
|
|
|
|
|
|
class EntropyRadialSearch: |
|
|
"""Search engine with entropy-based radial chunking |
|
|
|
|
|
This class implements an advanced search system that uses high-entropy tokens |
|
|
as semantic centers to create meaningful text chunks. The system provides |
|
|
efficient retrieval through semantic binary search and adaptive chunking |
|
|
strategies. |
|
|
|
|
|
Attributes: |
|
|
tokenizer: Pre-trained tokenizer for tokenizing text |
|
|
embeddings: Token embeddings loaded from the model |
|
|
db_path: Path to the SQLite database for storing chunks and metadata |
|
|
""" |
|
|
|
|
|
def __init__(self, model_dir: str = "qwen3_int8_harmonic", db_path: str = "entropy_radial_search.db"): |
|
|
"""Initialize the EntropyRadialSearch instance. |
|
|
|
|
|
Args: |
|
|
model_dir: Directory containing the pre-trained model files |
|
|
db_path: Path to the SQLite database file |
|
|
""" |
|
|
self._log_message("Loading Entropy-Based Radial Search Engine") |
|
|
|
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_dir) |
|
|
tensors = {} |
|
|
with safe_open(f"{model_dir}/model.safetensors", framework="numpy") as f: |
|
|
for key in f.keys(): |
|
|
tensors[key] = f.get_tensor(key) |
|
|
self.embeddings = tensors["embeddings"] |
|
|
self.weights = tensors["weights"] |
|
|
self._log_message(f"Model loaded: {len(self.embeddings)} tokens, weights tensor: {len(self.weights)} values") |
|
|
|
|
|
|
|
|
self.db_path = db_path |
|
|
self.init_database() |
|
|
self._log_message(f"Database ready: {db_path}") |
|
|
|
|
|
def _log_message(self, message: str) -> None: |
|
|
"""Log a message to standard output. |
|
|
|
|
|
Args: |
|
|
message: The message to log |
|
|
""" |
|
|
print(f"[LOG] {message}") |
|
|
|
|
|
@contextmanager |
|
|
def get_connection(self): |
|
|
"""SQLite connection context manager |
|
|
|
|
|
Provides a database connection that is automatically closed after use. |
|
|
|
|
|
Yields: |
|
|
sqlite3.Connection: Database connection object |
|
|
""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
conn.row_factory = sqlite3.Row |
|
|
try: |
|
|
yield conn |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def init_database(self) -> None: |
|
|
"""Initialize database with enhanced schema |
|
|
|
|
|
Creates the necessary tables and indexes for storing document chunks, |
|
|
entropy analysis data, and entropy centers. This method ensures all |
|
|
required database structures exist before the search engine is used. |
|
|
""" |
|
|
try: |
|
|
with self.get_connection() as conn: |
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS documents ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
chunk_id INTEGER, |
|
|
content TEXT, |
|
|
token_start INTEGER, |
|
|
token_end INTEGER, |
|
|
center_token TEXT, |
|
|
entropy_score REAL, |
|
|
radius INTEGER, |
|
|
adaptive_radius BOOLEAN DEFAULT FALSE, |
|
|
local_density REAL DEFAULT 0.0, |
|
|
fallback_chunk BOOLEAN DEFAULT FALSE, |
|
|
embedding BLOB, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS entropy_analysis ( |
|
|
filename TEXT PRIMARY KEY, |
|
|
total_tokens INTEGER, |
|
|
total_chunks INTEGER, |
|
|
entropy_threshold REAL, |
|
|
high_entropy_tokens INTEGER, |
|
|
avg_chunk_size REAL, |
|
|
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS entropy_centers ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
filename TEXT, |
|
|
token_position INTEGER, |
|
|
token_text TEXT, |
|
|
entropy_score REAL, |
|
|
chunk_assigned BOOLEAN DEFAULT FALSE |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunk_id ON documents(chunk_id)") |
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_entropy_score ON documents(entropy_score)") |
|
|
conn.commit() |
|
|
except sqlite3.Error as e: |
|
|
self._log_message(f"Database initialization error: {e}") |
|
|
raise |
|
|
|
|
|
def calculate_token_entropy(self, tokens: List[int]) -> List[Tuple[int, str, float]]: |
|
|
"""Calculate entropy for each token based on vector embedding entropy. |
|
|
|
|
|
This method calculates entropy scores for tokens using both Shannon entropy |
|
|
of normalized vector components and vector variance to identify |
|
|
high-entropy tokens that serve as semantic centers. |
|
|
|
|
|
Args: |
|
|
tokens: List of token IDs to calculate entropy for |
|
|
|
|
|
Returns: |
|
|
List of tuples containing (position, token_text, entropy_score) |
|
|
""" |
|
|
self._log_message("Calculating vector entropy...") |
|
|
|
|
|
entropy_scores: List[Tuple[int, str, float]] = [] |
|
|
|
|
|
|
|
|
for i, token_id in enumerate(tokens): |
|
|
if token_id < len(self.embeddings): |
|
|
vector = self.embeddings[token_id] |
|
|
|
|
|
|
|
|
normalized_vector = vector / np.linalg.norm(vector) |
|
|
|
|
|
epsilon = 1e-10 |
|
|
|
|
|
entropy = -np.sum(np.abs(normalized_vector) * np.log2(np.abs(normalized_vector) + epsilon)) |
|
|
|
|
|
|
|
|
variance = np.var(vector) |
|
|
variance_boost = 1.0 + variance * 0.1 |
|
|
|
|
|
|
|
|
if token_id < len(self.weights): |
|
|
|
|
|
weight_value = self.weights[token_id].item() |
|
|
|
|
|
rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 |
|
|
else: |
|
|
rarity_multiplier = 1.0 |
|
|
|
|
|
|
|
|
entropy_score = entropy * variance_boost * rarity_multiplier |
|
|
|
|
|
|
|
|
token_text = self.tokenizer.decode([token_id]) |
|
|
|
|
|
entropy_scores.append((i, token_text, entropy_score)) |
|
|
|
|
|
return entropy_scores |
|
|
|
|
|
def find_entropy_centers(self, entropy_scores: List[Tuple[int, str, float]], percentile: int = 99) -> Tuple[List[Dict[str, Any]], float]: |
|
|
"""Find high-entropy tokens to use as chunk centers. |
|
|
|
|
|
This method identifies tokens with entropy scores above the specified |
|
|
percentile to serve as semantic centers for chunking. |
|
|
|
|
|
Args: |
|
|
entropy_scores: List of (position, token_text, entropy_score) tuples |
|
|
percentile: Percentile threshold for selecting high-entropy tokens |
|
|
|
|
|
Returns: |
|
|
Tuple containing list of center dictionaries and entropy threshold |
|
|
""" |
|
|
self._log_message(f"Finding entropy centers (> {percentile}th percentile)...") |
|
|
|
|
|
|
|
|
scores = [score for _, _, score in entropy_scores] |
|
|
if not scores: |
|
|
return [], 0.0 |
|
|
threshold = np.percentile(scores, percentile) |
|
|
|
|
|
self._log_message(f" Entropy threshold: {threshold:.3f}") |
|
|
|
|
|
|
|
|
centers = [] |
|
|
for i, token, score in entropy_scores: |
|
|
if score >= threshold: |
|
|
centers.append({ |
|
|
'position': i, |
|
|
'token': token, |
|
|
'score': score |
|
|
}) |
|
|
|
|
|
|
|
|
centers.sort(key=lambda x: x['score'], reverse=True) |
|
|
|
|
|
self._log_message(f" Found {len(centers)} entropy centers") |
|
|
return centers, threshold |
|
|
|
|
|
def calculate_adaptive_radius(self, center_score: float, avg_entropy: float, local_entropy_density: float, base_radius: int = 100) -> int: |
|
|
"""Calculate adaptive radius based on entropy and local density. |
|
|
|
|
|
This method calculates a dynamic radius for chunking based on the center |
|
|
token's entropy score and the local entropy density. |
|
|
|
|
|
Args: |
|
|
center_score: Entropy score of the center token |
|
|
avg_entropy: Average entropy across all tokens |
|
|
local_entropy_density: Density of high-entropy tokens in the local area |
|
|
base_radius: Base radius to use as reference |
|
|
|
|
|
Returns: |
|
|
Integer radius value bounded between 50 and 200 tokens |
|
|
""" |
|
|
|
|
|
entropy_factor = center_score / avg_entropy if avg_entropy > 0 else 1.0 |
|
|
|
|
|
|
|
|
density_factor = 1.0 / (1.0 + local_entropy_density) |
|
|
|
|
|
|
|
|
adaptive_radius = int(base_radius * entropy_factor * density_factor) |
|
|
return max(50, min(200, adaptive_radius)) |
|
|
|
|
|
def calculate_local_entropy_density(self, centers: List[Dict[str, Any]], position: int, window: int = 50) -> float: |
|
|
"""Calculate entropy density around a position. |
|
|
|
|
|
This method calculates the density of high-entropy tokens within a |
|
|
specified window around a given position. |
|
|
|
|
|
Args: |
|
|
centers: List of high-entropy token centers |
|
|
position: Position to calculate density around |
|
|
window: Size of the window for density calculation |
|
|
|
|
|
Returns: |
|
|
Float representing the entropy density |
|
|
""" |
|
|
nearby_centers = [c for c in centers if abs(c['position'] - position) <= window] |
|
|
if not nearby_centers: |
|
|
return 0.0 |
|
|
return len(nearby_centers) / (2 * window + 1) |
|
|
|
|
|
def is_semantic_boundary(self, token_id: int, tokens: List[int], pos: int) -> bool: |
|
|
"""Check if token position represents a language-agnostic semantic boundary. |
|
|
|
|
|
This method determines if a token position represents a semantic boundary |
|
|
by analyzing embedding similarities with neighboring tokens. |
|
|
|
|
|
Args: |
|
|
token_id: ID of the current token |
|
|
tokens: List of all token IDs |
|
|
pos: Position of the current token in the token list |
|
|
|
|
|
Returns: |
|
|
Boolean indicating whether this position is a semantic boundary |
|
|
""" |
|
|
if pos < 0 or pos >= len(tokens): |
|
|
return True |
|
|
|
|
|
|
|
|
if token_id >= len(self.embeddings): |
|
|
return True |
|
|
current_vector = self.embeddings[token_id] |
|
|
|
|
|
|
|
|
boundary_scores = [] |
|
|
|
|
|
|
|
|
if pos > 0: |
|
|
left_token_id = tokens[pos - 1] |
|
|
if left_token_id < len(self.embeddings): |
|
|
left_vector = self.embeddings[left_token_id] |
|
|
|
|
|
similarity = np.dot(current_vector, left_vector) / ( |
|
|
np.linalg.norm(current_vector) * np.linalg.norm(left_vector) |
|
|
) |
|
|
boundary_scores.append(1.0 - similarity) |
|
|
|
|
|
|
|
|
if pos < len(tokens) - 1: |
|
|
right_token_id = tokens[pos + 1] |
|
|
if right_token_id < len(self.embeddings): |
|
|
right_vector = self.embeddings[right_token_id] |
|
|
similarity = np.dot(current_vector, right_vector) / ( |
|
|
np.linalg.norm(current_vector) * np.linalg.norm(right_vector) |
|
|
) |
|
|
boundary_scores.append(1.0 - similarity) |
|
|
|
|
|
|
|
|
if pos > 1: |
|
|
left2_token_id = tokens[pos - 2] |
|
|
if left2_token_id < len(self.embeddings): |
|
|
left2_vector = self.embeddings[left2_token_id] |
|
|
similarity = np.dot(current_vector, left2_vector) / ( |
|
|
np.linalg.norm(current_vector) * np.linalg.norm(left2_vector) |
|
|
) |
|
|
boundary_scores.append(1.0 - similarity) |
|
|
|
|
|
if pos < len(tokens) - 2: |
|
|
right2_token_id = tokens[pos + 2] |
|
|
if right2_token_id < len(self.embeddings): |
|
|
right2_vector = self.embeddings[right2_token_id] |
|
|
similarity = np.dot(current_vector, right2_vector) / ( |
|
|
np.linalg.norm(current_vector) * np.linalg.norm(right2_vector) |
|
|
) |
|
|
boundary_scores.append(1.0 - similarity) |
|
|
|
|
|
|
|
|
if boundary_scores: |
|
|
avg_boundary_score = sum(boundary_scores) / len(boundary_scores) |
|
|
|
|
|
|
|
|
if token_id < len(self.embeddings): |
|
|
vector = self.embeddings[token_id] |
|
|
normalized_vector = vector / np.linalg.norm(vector) |
|
|
epsilon = 1e-10 |
|
|
|
|
|
entropy = -np.sum(np.abs(normalized_vector) * np.log2(np.abs(normalized_vector) + epsilon)) |
|
|
|
|
|
|
|
|
if token_id < len(self.weights): |
|
|
weight_value = self.weights[token_id].item() |
|
|
rarity_multiplier = 1.0 + (weight_value / 127.0) * 0.5 |
|
|
else: |
|
|
rarity_multiplier = 1.0 |
|
|
entropy = entropy * rarity_multiplier |
|
|
|
|
|
|
|
|
combined_score = avg_boundary_score * (1.0 + entropy * 0.001) |
|
|
|
|
|
|
|
|
return combined_score > 0.4 |
|
|
|
|
|
return False |
|
|
|
|
|
def expand_to_boundaries(self, tokens: List[int], center_pos: int, max_expansion: int = 500) -> Tuple[int, int]: |
|
|
"""Naturally expand from center until hitting natural boundaries. |
|
|
|
|
|
This method expands from a center position in both directions until |
|
|
semantic boundaries are encountered. |
|
|
|
|
|
Args: |
|
|
tokens: List of token IDs |
|
|
center_pos: Starting position for expansion |
|
|
max_expansion: Maximum number of tokens to expand in each direction |
|
|
|
|
|
Returns: |
|
|
Tuple containing start and end positions for the chunk |
|
|
""" |
|
|
start_pos = center_pos |
|
|
end_pos = center_pos |
|
|
|
|
|
|
|
|
left_expansion = 0 |
|
|
while start_pos > 0 and left_expansion < max_expansion: |
|
|
prev_pos = start_pos - 1 |
|
|
if self.is_semantic_boundary(tokens[prev_pos], tokens, prev_pos): |
|
|
break |
|
|
start_pos = prev_pos |
|
|
left_expansion += 1 |
|
|
|
|
|
|
|
|
right_expansion = 0 |
|
|
while end_pos < len(tokens) - 1 and right_expansion < max_expansion: |
|
|
next_pos = end_pos + 1 |
|
|
if self.is_semantic_boundary(tokens[next_pos], tokens, next_pos): |
|
|
break |
|
|
end_pos = next_pos |
|
|
right_expansion += 1 |
|
|
|
|
|
return start_pos, end_pos + 1 |
|
|
|
|
|
def create_radial_chunks(self, tokens: List[int], centers: List[Dict[str, Any]], max_expansion: int = 500) -> List[Dict[str, Any]]: |
|
|
"""Slice corpus at midpoints between 99th percentile high-entropy tokens. |
|
|
|
|
|
This method creates chunks by slicing the text at midpoints between |
|
|
high-entropy token centers. |
|
|
|
|
|
Args: |
|
|
tokens: List of all token IDs in the document |
|
|
centers: List of high-entropy token centers |
|
|
max_expansion: Maximum expansion for chunk boundaries (not used in this method) |
|
|
|
|
|
Returns: |
|
|
List of chunk dictionaries with position and content information |
|
|
""" |
|
|
self._log_message(f"SLICING corpus at midpoints between {len(centers)} high-entropy centers...") |
|
|
|
|
|
if not centers: |
|
|
return [] |
|
|
|
|
|
|
|
|
sorted_centers = sorted(centers, key=lambda x: x['position']) |
|
|
|
|
|
chunks = [] |
|
|
|
|
|
|
|
|
if len(sorted_centers) > 1: |
|
|
first_center = sorted_centers[0] |
|
|
second_center = sorted_centers[1] |
|
|
midpoint = (first_center['position'] + second_center['position']) // 2 |
|
|
start_pos = 0 |
|
|
end_pos = midpoint + 1 |
|
|
|
|
|
chunk_tokens = tokens[start_pos:end_pos] |
|
|
chunks.append({ |
|
|
'center_pos': first_center['position'], |
|
|
'center_token': first_center['token'], |
|
|
'center_score': first_center['score'], |
|
|
'start_pos': start_pos, |
|
|
'end_pos': end_pos, |
|
|
'tokens': chunk_tokens, |
|
|
'chunk_id': len(chunks) |
|
|
}) |
|
|
|
|
|
|
|
|
for i in range(1, len(sorted_centers) - 1): |
|
|
current_center = sorted_centers[i] |
|
|
prev_center = sorted_centers[i-1] |
|
|
next_center = sorted_centers[i+1] |
|
|
|
|
|
left_midpoint = (prev_center['position'] + current_center['position']) // 2 |
|
|
right_midpoint = (current_center['position'] + next_center['position']) // 2 |
|
|
|
|
|
start_pos = left_midpoint + 1 |
|
|
end_pos = right_midpoint + 1 |
|
|
|
|
|
chunk_tokens = tokens[start_pos:end_pos] |
|
|
chunks.append({ |
|
|
'center_pos': current_center['position'], |
|
|
'center_token': current_center['token'], |
|
|
'center_score': current_center['score'], |
|
|
'start_pos': start_pos, |
|
|
'end_pos': end_pos, |
|
|
'tokens': chunk_tokens, |
|
|
'chunk_id': len(chunks) |
|
|
}) |
|
|
|
|
|
|
|
|
if len(sorted_centers) > 1: |
|
|
last_center = sorted_centers[-1] |
|
|
prev_center = sorted_centers[-2] |
|
|
midpoint = (prev_center['position'] + last_center['position']) // 2 |
|
|
start_pos = midpoint + 1 |
|
|
end_pos = len(tokens) |
|
|
|
|
|
chunk_tokens = tokens[start_pos:end_pos] |
|
|
chunks.append({ |
|
|
'center_pos': last_center['position'], |
|
|
'center_token': last_center['token'], |
|
|
'center_score': last_center['score'], |
|
|
'start_pos': start_pos, |
|
|
'end_pos': end_pos, |
|
|
'tokens': chunk_tokens, |
|
|
'chunk_id': len(chunks) |
|
|
}) |
|
|
|
|
|
|
|
|
elif len(sorted_centers) == 1: |
|
|
center = sorted_centers[0] |
|
|
chunks.append({ |
|
|
'center_pos': center['position'], |
|
|
'center_token': center['token'], |
|
|
'center_score': center['score'], |
|
|
'start_pos': 0, |
|
|
'end_pos': len(tokens), |
|
|
'tokens': tokens, |
|
|
'chunk_id': len(chunks) |
|
|
}) |
|
|
|
|
|
self._log_message(f" Sliced corpus into {len(chunks)} chunks at high-entropy midpoints") |
|
|
self._log_message(f" Perfect coverage: 100% (corpus fully partitioned)") |
|
|
self._log_message(f" Avg chunk size: {len(tokens)/len(chunks):.1f} tokens") |
|
|
|
|
|
return chunks |
|
|
|
|
|
def semantic_binary_search(self, query: str, max_depth: int = 5) -> List[Dict[str, Any]]: |
|
|
"""Navigate corpus using semantic binary search on 99th percentile entropy tokens. |
|
|
|
|
|
This method performs a binary search in semantic space by analyzing |
|
|
high-entropy tokens in chunks and navigating toward the most relevant |
|
|
content based on query similarity. |
|
|
|
|
|
Args: |
|
|
query: Query string to search for |
|
|
max_depth: Maximum depth for semantic search navigation |
|
|
|
|
|
Returns: |
|
|
List of context chunks around the stopping point |
|
|
""" |
|
|
self._log_message(f"SEMANTIC BINARY SEARCH: '{query}'") |
|
|
self._log_message("=" * 60) |
|
|
|
|
|
|
|
|
query_embedding = self.encode_text(query) |
|
|
|
|
|
|
|
|
with self.get_connection() as conn: |
|
|
cursor = conn.execute(""" |
|
|
SELECT chunk_id, center_token, entropy_score, token_start, token_end, content, embedding |
|
|
FROM documents |
|
|
ORDER BY chunk_id |
|
|
""") |
|
|
|
|
|
chunks = [] |
|
|
for row in cursor: |
|
|
doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32) |
|
|
similarity = self.cosine_similarity(query_embedding, doc_embedding) |
|
|
chunks.append({ |
|
|
'chunk_id': row['chunk_id'], |
|
|
'center_token': row['center_token'], |
|
|
'center_score': row['entropy_score'], |
|
|
'token_start': row['token_start'], |
|
|
'token_end': row['token_end'], |
|
|
'content': row['content'], |
|
|
'similarity': similarity |
|
|
}) |
|
|
|
|
|
|
|
|
chunks.sort(key=lambda x: x['similarity'], reverse=True) |
|
|
|
|
|
if not chunks: |
|
|
self._log_message("No chunks found for semantic search") |
|
|
return [] |
|
|
|
|
|
|
|
|
current_chunk = chunks[0] |
|
|
path = [current_chunk] |
|
|
self._log_message(f"Starting at Chunk {current_chunk['chunk_id']} (similarity: {current_chunk['similarity']:.3f})") |
|
|
self._log_message(f" Center: '{current_chunk['center_token']}' (entropy: {current_chunk['center_score']:.2f})") |
|
|
|
|
|
for depth in range(max_depth): |
|
|
self._log_message(f"\nDepth {depth + 1}: Analyzing Chunk {current_chunk['chunk_id']}") |
|
|
|
|
|
|
|
|
chunk_tokens = [] |
|
|
with self.get_connection() as conn: |
|
|
cursor = conn.execute(""" |
|
|
SELECT content, token_start, token_end FROM documents WHERE chunk_id = ? |
|
|
""", (current_chunk['chunk_id'],)) |
|
|
row = cursor.fetchone() |
|
|
if row: |
|
|
|
|
|
chunk_text = row['content'] |
|
|
chunk_tokens = self.tokenizer.encode(chunk_text, add_special_tokens=False) |
|
|
|
|
|
current_chunk['token_start'] = row['token_start'] |
|
|
current_chunk['token_end'] = row['token_end'] |
|
|
|
|
|
if len(chunk_tokens) < 3: |
|
|
self._log_message(f" Chunk too small for binary navigation") |
|
|
break |
|
|
|
|
|
|
|
|
center_token_id = self.tokenizer.encode(current_chunk['center_token'], add_special_tokens=False)[0] if self.tokenizer.encode(current_chunk['center_token'], add_special_tokens=False) else None |
|
|
|
|
|
if center_token_id is None: |
|
|
self._log_message(f" Cannot find center token in chunk") |
|
|
break |
|
|
|
|
|
try: |
|
|
center_pos = chunk_tokens.index(center_token_id) |
|
|
except ValueError: |
|
|
center_pos = len(chunk_tokens) // 2 |
|
|
self._log_message(f" Center token not found, using middle position {center_pos}") |
|
|
|
|
|
|
|
|
left_tokens = chunk_tokens[:center_pos] |
|
|
right_tokens = chunk_tokens[center_pos + 1:] |
|
|
|
|
|
self._log_message(f" Left section: {len(left_tokens)} tokens") |
|
|
self._log_message(f" Right section: {len(right_tokens)} tokens") |
|
|
|
|
|
|
|
|
left_entropy_scores = [] |
|
|
for i, token_id in enumerate(left_tokens): |
|
|
if token_id < len(self.embeddings): |
|
|
vector = self.embeddings[token_id] |
|
|
normalized_vector = vector / np.linalg.norm(vector) |
|
|
epsilon = 1e-10 |
|
|
entropy = -np.sum(normalized_vector * np.log2(np.abs(normalized_vector) + epsilon)) |
|
|
variance = np.var(vector) |
|
|
|
|
|
if token_id < len(self.weights): |
|
|
weight_value = self.weights[token_id].item() |
|
|
rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 |
|
|
else: |
|
|
rarity_multiplier = 1.0 |
|
|
entropy_score = entropy * (1.0 + variance * 0.1) * rarity_multiplier |
|
|
left_entropy_scores.append((i, token_id, entropy_score)) |
|
|
|
|
|
right_entropy_scores = [] |
|
|
for i, token_id in enumerate(right_tokens): |
|
|
if token_id < len(self.embeddings): |
|
|
vector = self.embeddings[token_id] |
|
|
normalized_vector = vector / np.linalg.norm(vector) |
|
|
epsilon = 1e-10 |
|
|
entropy = -np.sum(normalized_vector * np.log2(np.abs(normalized_vector) + epsilon)) |
|
|
variance = np.var(vector) |
|
|
|
|
|
if token_id < len(self.weights): |
|
|
weight_value = self.weights[token_id].item() |
|
|
rarity_multiplier = 1.0 + (weight_value / 127.0) * 1.0 |
|
|
else: |
|
|
rarity_multiplier = 1.0 |
|
|
entropy_score = entropy * (1.0 + variance * 0.1) * rarity_multiplier |
|
|
right_entropy_scores.append((i, token_id, entropy_score)) |
|
|
|
|
|
|
|
|
if left_entropy_scores: |
|
|
left_scores = [score for _, _, score in left_entropy_scores] |
|
|
left_threshold = np.percentile(left_scores, 99) if len(left_scores) > 0 else 0 |
|
|
left_high_entropy = [(pos, token_id, score) for pos, token_id, score in left_entropy_scores if score >= left_threshold] |
|
|
else: |
|
|
left_high_entropy = [] |
|
|
left_threshold = 0 |
|
|
|
|
|
if right_entropy_scores: |
|
|
right_scores = [score for _, _, score in right_entropy_scores] |
|
|
right_threshold = np.percentile(right_scores, 99) if len(right_scores) > 0 else 0 |
|
|
right_high_entropy = [(pos, token_id, score) for pos, token_id, score in right_entropy_scores if score >= right_threshold] |
|
|
else: |
|
|
right_high_entropy = [] |
|
|
right_threshold = 0 |
|
|
|
|
|
self._log_message(f" Left 99th percentile threshold: {left_threshold:.2f} ({len(left_high_entropy)} tokens)") |
|
|
self._log_message(f" Right 99th percentile threshold: {right_threshold:.2f} ({len(right_high_entropy)} tokens)") |
|
|
|
|
|
if not left_high_entropy and not right_high_entropy: |
|
|
self._log_message(f" No high-entropy tokens found - search complete") |
|
|
break |
|
|
|
|
|
|
|
|
left_similarity_sum = 0 |
|
|
right_similarity_sum = 0 |
|
|
|
|
|
for pos, token_id, score in left_high_entropy: |
|
|
if token_id < len(self.embeddings): |
|
|
token_embedding = self.embeddings[token_id] |
|
|
similarity = self.cosine_similarity(query_embedding, token_embedding) |
|
|
left_similarity_sum += similarity * score |
|
|
|
|
|
for pos, token_id, score in right_high_entropy: |
|
|
if token_id < len(self.embeddings): |
|
|
token_embedding = self.embeddings[token_id] |
|
|
similarity = self.cosine_similarity(query_embedding, token_embedding) |
|
|
right_similarity_sum += similarity * score |
|
|
|
|
|
self._log_message(f" Left query similarity: {left_similarity_sum:.3f}") |
|
|
self._log_message(f" Right query similarity: {right_similarity_sum:.3f}") |
|
|
|
|
|
|
|
|
if left_similarity_sum > right_similarity_sum: |
|
|
direction = "LEFT" |
|
|
self._log_message(f" Query more similar to LEFT section") |
|
|
elif right_similarity_sum > left_similarity_sum: |
|
|
direction = "RIGHT" |
|
|
self._log_message(f" Query more similar to RIGHT section") |
|
|
else: |
|
|
direction = "EQUAL" |
|
|
self._log_message(f" Equal similarity - search complete") |
|
|
break |
|
|
|
|
|
|
|
|
with self.get_connection() as conn: |
|
|
if direction == "LEFT": |
|
|
cursor = conn.execute(""" |
|
|
SELECT chunk_id, center_token, entropy_score, content, embedding |
|
|
FROM documents |
|
|
WHERE token_end < ? |
|
|
ORDER BY token_end DESC |
|
|
LIMIT 1 |
|
|
""", (current_chunk['token_start'],)) |
|
|
else: |
|
|
cursor = conn.execute(""" |
|
|
SELECT chunk_id, center_token, entropy_score, content, embedding |
|
|
FROM documents |
|
|
WHERE token_start > ? |
|
|
ORDER BY token_start ASC |
|
|
LIMIT 1 |
|
|
""", (current_chunk['token_end'],)) |
|
|
|
|
|
next_row = cursor.fetchone() |
|
|
if next_row: |
|
|
next_embedding = np.frombuffer(next_row['embedding'], dtype=np.float32) |
|
|
next_similarity = self.cosine_similarity(query_embedding, next_embedding) |
|
|
|
|
|
current_chunk = { |
|
|
'chunk_id': next_row['chunk_id'], |
|
|
'center_token': next_row['center_token'], |
|
|
'center_score': next_row['entropy_score'], |
|
|
'content': next_row['content'], |
|
|
'similarity': next_similarity |
|
|
} |
|
|
|
|
|
path.append(current_chunk) |
|
|
self._log_message(f" Moved {direction} to Chunk {current_chunk['chunk_id']} (similarity: {current_chunk['similarity']:.3f})") |
|
|
self._log_message(f" Center: '{current_chunk['center_token']}' (entropy: {current_chunk['center_score']:.2f})") |
|
|
self._log_message(f" Content: \"{current_chunk['content'][:100]}...\"") |
|
|
else: |
|
|
self._log_message(f" No more chunks in {direction} direction") |
|
|
break |
|
|
|
|
|
self._log_message(f"\nSEMANTIC SEARCH PATH:") |
|
|
self._log_message("-" * 40) |
|
|
for i, chunk in enumerate(path): |
|
|
self._log_message(f"Step {i}: Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})") |
|
|
self._log_message(f" Center: '{chunk['center_token']}' - \"{chunk['content'][:80]}...\"") |
|
|
|
|
|
|
|
|
return self.get_context_window(current_chunk, query_embedding, target_tokens=512) |
|
|
|
|
|
def get_context_window(self, center_chunk, query_embedding, target_tokens=512): |
|
|
"""Get surrounding chunks around the stopping point until reaching target token count""" |
|
|
print(f"\n🔍 GETTING CONTEXT WINDOW: ~{target_tokens} tokens around Chunk {center_chunk['chunk_id']}") |
|
|
self._log_message("=" * 60) |
|
|
|
|
|
with self.get_connection() as conn: |
|
|
|
|
|
cursor = conn.execute(""" |
|
|
SELECT chunk_id, center_token, entropy_score, token_start, token_end, content, embedding |
|
|
FROM documents |
|
|
ORDER BY chunk_id |
|
|
""") |
|
|
|
|
|
all_chunks = [] |
|
|
for row in cursor: |
|
|
doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32) |
|
|
similarity = self.cosine_similarity(query_embedding, doc_embedding) |
|
|
tokens_in_chunk = row['token_end'] - row['token_start'] |
|
|
all_chunks.append({ |
|
|
'chunk_id': row['chunk_id'], |
|
|
'center_token': row['center_token'], |
|
|
'center_score': row['entropy_score'], |
|
|
'token_start': row['token_start'], |
|
|
'token_end': row['token_end'], |
|
|
'content': row['content'], |
|
|
'similarity': similarity, |
|
|
'tokens_count': tokens_in_chunk, |
|
|
'direction': 'CENTER' if row['chunk_id'] == center_chunk['chunk_id'] else None |
|
|
}) |
|
|
|
|
|
|
|
|
center_idx = None |
|
|
for i, chunk in enumerate(all_chunks): |
|
|
if chunk['chunk_id'] == center_chunk['chunk_id']: |
|
|
center_idx = i |
|
|
chunk['direction'] = 'CENTER' |
|
|
break |
|
|
|
|
|
if center_idx is None: |
|
|
self._log_message("Center chunk not found in all chunks list") |
|
|
return [] |
|
|
|
|
|
|
|
|
context_chunks = [all_chunks[center_idx]] |
|
|
total_tokens = all_chunks[center_idx]['tokens_count'] |
|
|
|
|
|
left_idx = center_idx - 1 |
|
|
right_idx = center_idx + 1 |
|
|
|
|
|
while total_tokens < target_tokens: |
|
|
|
|
|
left_available = left_idx >= 0 |
|
|
right_available = right_idx < len(all_chunks) |
|
|
|
|
|
if not left_available and not right_available: |
|
|
break |
|
|
|
|
|
|
|
|
if left_available and not right_available: |
|
|
next_chunk = all_chunks[left_idx] |
|
|
left_idx -= 1 |
|
|
elif right_available and not left_available: |
|
|
next_chunk = all_chunks[right_idx] |
|
|
right_idx += 1 |
|
|
else: |
|
|
|
|
|
left_chunk = all_chunks[left_idx] |
|
|
right_chunk = all_chunks[right_idx] |
|
|
|
|
|
if left_chunk['tokens_count'] <= right_chunk['tokens_count']: |
|
|
next_chunk = left_chunk |
|
|
left_idx -= 1 |
|
|
else: |
|
|
next_chunk = right_chunk |
|
|
right_idx += 1 |
|
|
|
|
|
|
|
|
direction = 'BEFORE' if next_chunk['chunk_id'] < center_chunk['chunk_id'] else 'AFTER' |
|
|
next_chunk['direction'] = direction |
|
|
|
|
|
context_chunks.append(next_chunk) |
|
|
total_tokens += next_chunk['tokens_count'] |
|
|
|
|
|
|
|
|
context_chunks_sorted = sorted(context_chunks, key=lambda x: x['similarity'], reverse=True) |
|
|
|
|
|
self._log_message(f"CONTEXT WINDOW RESULTS:") |
|
|
self._log_message(f" Total chunks: {len(context_chunks)}") |
|
|
self._log_message(f" Total tokens: {total_tokens}") |
|
|
self._log_message(f" Range: Chunk {min(c['chunk_id'] for c in context_chunks)} to Chunk {max(c['chunk_id'] for c in context_chunks)}") |
|
|
self._log_message(f" Center: Chunk {center_chunk['chunk_id']} ('{center_chunk['center_token']}')") |
|
|
|
|
|
self._log_message(f"\nTOP 10 MOST SIMILAR CHUNKS IN CONTEXT:") |
|
|
self._log_message("-" * 50) |
|
|
for i, chunk in enumerate(context_chunks_sorted[:10], 1): |
|
|
self._log_message(f"{i}. Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})") |
|
|
self._log_message(f" Direction: {chunk['direction']}, Tokens: {chunk['tokens_count']}") |
|
|
self._log_message(f" Center: '{chunk['center_token']}' (entropy: {chunk['center_score']:.2f})") |
|
|
self._log_message(f" Content: \"{chunk['content'][:100]}...\"") |
|
|
self._log_message("") |
|
|
|
|
|
|
|
|
keywords = ['radar', 'cavity', 'microwave', 'waveguide', 'standing', 'waves', 'zero-point', 'energy'] |
|
|
keyword_matches = [] |
|
|
|
|
|
for chunk in context_chunks: |
|
|
content_lower = chunk['content'].lower() |
|
|
for keyword in keywords: |
|
|
if keyword in content_lower: |
|
|
if chunk not in keyword_matches: |
|
|
keyword_matches.append(chunk) |
|
|
break |
|
|
|
|
|
if keyword_matches: |
|
|
self._log_message(f"\nKEYWORD MATCHES FOUND:") |
|
|
self._log_message("-" * 40) |
|
|
for i, chunk in enumerate(keyword_matches[:5], 1): |
|
|
self._log_message(f"{i}. Chunk {chunk['chunk_id']} (similarity: {chunk['similarity']:.3f})") |
|
|
|
|
|
content_lower = chunk['content'].lower() |
|
|
matched_keywords = [kw for kw in keywords if kw in content_lower] |
|
|
self._log_message(f" Keywords: {', '.join(matched_keywords)}") |
|
|
self._log_message(f" Content: \"{chunk['content'][:150]}...\"") |
|
|
self._log_message("") |
|
|
|
|
|
self._log_message(f"CONTEXT WINDOW SEARCH COMPLETE") |
|
|
self._log_message(f" Analyzed {len(context_chunks)} chunks (~{total_tokens} tokens) around the semantic search stopping point") |
|
|
|
|
|
return context_chunks_sorted |
|
|
|
|
|
def create_fallback_chunks(self, tokens: List[int], unassigned_positions: List[int], start_chunk_id: int, fallback_size: int = 64) -> List[Dict[str, Any]]: |
|
|
"""Create chunks for unassigned tokens to ensure complete coverage. |
|
|
|
|
|
This method creates fallback chunks for any tokens that weren't assigned |
|
|
to high-entropy center-based chunks, ensuring complete document coverage. |
|
|
|
|
|
Args: |
|
|
tokens: List of all token IDs in the document |
|
|
unassigned_positions: List of token positions that weren't assigned to chunks |
|
|
start_chunk_id: Initial chunk ID to use for the fallback chunks |
|
|
fallback_size: Size of each fallback chunk |
|
|
|
|
|
Returns: |
|
|
List of fallback chunk dictionaries |
|
|
""" |
|
|
fallback_chunks = [] |
|
|
|
|
|
|
|
|
groups = [] |
|
|
current_group = [] |
|
|
|
|
|
for pos in unassigned_positions: |
|
|
if not current_group or pos == current_group[-1] + 1: |
|
|
current_group.append(pos) |
|
|
else: |
|
|
groups.append(current_group) |
|
|
current_group = [pos] |
|
|
if current_group: |
|
|
groups.append(current_group) |
|
|
|
|
|
|
|
|
for group in groups: |
|
|
for i in range(0, len(group), fallback_size): |
|
|
chunk_start = group[i] |
|
|
chunk_end = group[min(i + fallback_size - 1, len(group) - 1)] |
|
|
chunk_tokens = tokens[chunk_start:chunk_end + 1] |
|
|
|
|
|
fallback_chunks.append({ |
|
|
'center_pos': (chunk_start + chunk_end) // 2, |
|
|
'center_token': 'fallback', |
|
|
'center_score': 0.0, |
|
|
'start_pos': chunk_start, |
|
|
'end_pos': chunk_end + 1, |
|
|
'tokens': chunk_tokens, |
|
|
'chunk_id': start_chunk_id + len(fallback_chunks), |
|
|
'radius': (chunk_end - chunk_start + 1) // 2, |
|
|
'adaptive_radius': False, |
|
|
'fallback': True, |
|
|
'local_density': 0.0 |
|
|
}) |
|
|
|
|
|
return fallback_chunks |
|
|
|
|
|
def process_text_file(self, file_path: str, entropy_percentile: int = 85, chunk_radius: int = 100, enable_adaptive: bool = True) -> None: |
|
|
"""Process text file with improved entropy-based radial chunking. |
|
|
|
|
|
This method processes a text file by tokenizing it, calculating entropy |
|
|
scores for each token, finding high-entropy centers, and creating |
|
|
radial chunks around these centers. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the text file to process |
|
|
entropy_percentile: Percentile for selecting high-entropy tokens |
|
|
chunk_radius: Base radius for chunks |
|
|
enable_adaptive: Whether to enable adaptive chunking |
|
|
""" |
|
|
self._log_message(f"Processing: {file_path}") |
|
|
self._log_message(f" Entropy percentile: {entropy_percentile}%, Base radius: {chunk_radius}, Adaptive: {enable_adaptive}") |
|
|
|
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
|
|
text = f.read() |
|
|
|
|
|
|
|
|
self._log_message("Tokenizing text...") |
|
|
tokens = self.tokenizer.encode(text, add_special_tokens=False) |
|
|
self._log_message(f" Total tokens: {len(tokens):,}") |
|
|
|
|
|
|
|
|
entropy_scores = self.calculate_token_entropy(tokens) |
|
|
|
|
|
|
|
|
centers, threshold = self.find_entropy_centers(entropy_scores, entropy_percentile) |
|
|
|
|
|
|
|
|
chunks = self.create_radial_chunks(tokens, centers, max_expansion=chunk_radius) |
|
|
|
|
|
|
|
|
with self.get_connection() as conn: |
|
|
conn.execute("DELETE FROM documents") |
|
|
conn.execute("DELETE FROM entropy_centers") |
|
|
conn.commit() |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
with self.get_connection() as conn: |
|
|
for chunk in chunks: |
|
|
|
|
|
chunk_text = self.tokenizer.decode(chunk['tokens']) |
|
|
|
|
|
|
|
|
embedding = self.encode_text(chunk_text) |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
INSERT INTO documents |
|
|
(chunk_id, content, token_start, token_end, center_token, entropy_score, |
|
|
radius, adaptive_radius, local_density, fallback_chunk, embedding) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
""", ( |
|
|
chunk['chunk_id'], |
|
|
chunk_text, |
|
|
chunk['start_pos'], |
|
|
chunk['end_pos'], |
|
|
chunk['center_token'], |
|
|
chunk['center_score'], |
|
|
chunk.get('radius', 0), |
|
|
chunk.get('adaptive_radius', False), |
|
|
chunk.get('local_density', 0.0), |
|
|
chunk.get('fallback', False), |
|
|
embedding.tobytes() |
|
|
)) |
|
|
|
|
|
|
|
|
for center in centers: |
|
|
conn.execute(""" |
|
|
INSERT INTO entropy_centers |
|
|
(filename, token_position, token_text, entropy_score) |
|
|
VALUES (?, ?, ?, ?) |
|
|
""", (os.path.basename(file_path), center['position'], center['token'], center['score'])) |
|
|
|
|
|
|
|
|
avg_chunk_size = sum(len(c['tokens']) for c in chunks) / len(chunks) if chunks else 0 |
|
|
conn.execute(""" |
|
|
INSERT OR REPLACE INTO entropy_analysis |
|
|
(filename, total_tokens, total_chunks, entropy_threshold, high_entropy_tokens, avg_chunk_size) |
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
""", ( |
|
|
os.path.basename(file_path), |
|
|
len(tokens), |
|
|
len(chunks), |
|
|
threshold, |
|
|
len(centers), |
|
|
avg_chunk_size |
|
|
)) |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
self._log_message(f"Processing complete in {elapsed:.2f}s") |
|
|
if chunks: |
|
|
self._log_message(f" Avg time per chunk: {elapsed/len(chunks)*1000:.2f}ms") |
|
|
else: |
|
|
self._log_message(" Avg time per chunk: N/A") |
|
|
|
|
|
def encode_text(self, text: str) -> np.ndarray: |
|
|
"""Encode text using mean pooling. |
|
|
|
|
|
This method converts text to embeddings by tokenizing it and then |
|
|
taking the mean of the token embeddings. |
|
|
|
|
|
Args: |
|
|
text: Input text to encode |
|
|
|
|
|
Returns: |
|
|
Normalized embedding vector |
|
|
""" |
|
|
tokens = self.tokenizer.encode(text, add_special_tokens=False) |
|
|
if not tokens: |
|
|
return np.zeros(self.embeddings.shape[1]) |
|
|
|
|
|
token_embeddings = [] |
|
|
for token_id in tokens: |
|
|
if token_id < len(self.embeddings): |
|
|
token_embeddings.append(self.embeddings[token_id]) |
|
|
|
|
|
if not token_embeddings: |
|
|
return np.zeros(self.embeddings.shape[1]) |
|
|
|
|
|
token_embeddings = np.array(token_embeddings, dtype=np.float32) |
|
|
final_embedding = np.mean(token_embeddings, axis=0) |
|
|
final_embedding = final_embedding / np.linalg.norm(final_embedding) |
|
|
return final_embedding |
|
|
|
|
|
def cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float: |
|
|
"""Calculate cosine similarity between two vectors. |
|
|
|
|
|
Args: |
|
|
a: First vector |
|
|
b: Second vector |
|
|
|
|
|
Returns: |
|
|
Cosine similarity value between -1 and 1 |
|
|
""" |
|
|
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) |
|
|
|
|
|
def search(self, query: str, limit: int = 10, min_similarity: float = 0.1) -> List[Dict[str, Any]]: |
|
|
"""Search with entropy-aware results. |
|
|
|
|
|
This method performs a similarity search for the given query and |
|
|
returns the most relevant text chunks based on embedding similarity. |
|
|
|
|
|
Args: |
|
|
query: Query string to search for |
|
|
limit: Maximum number of results to return |
|
|
min_similarity: Minimum similarity threshold for results |
|
|
|
|
|
Returns: |
|
|
List of result dictionaries containing chunk information |
|
|
""" |
|
|
self._log_message(f"Searching for: '{query}'") |
|
|
|
|
|
|
|
|
query_embedding = self.encode_text(query) |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
with self.get_connection() as conn: |
|
|
cursor = conn.execute(""" |
|
|
SELECT chunk_id, content, token_start, token_end, center_token, entropy_score, |
|
|
radius, adaptive_radius, local_density, fallback_chunk, embedding |
|
|
FROM documents |
|
|
ORDER BY chunk_id |
|
|
""") |
|
|
|
|
|
results = [] |
|
|
for row in cursor: |
|
|
|
|
|
doc_embedding = np.frombuffer(row['embedding'], dtype=np.float32) |
|
|
|
|
|
|
|
|
similarity = self.cosine_similarity(query_embedding, doc_embedding) |
|
|
|
|
|
if similarity >= min_similarity: |
|
|
results.append({ |
|
|
'chunk_id': row['chunk_id'], |
|
|
'content': row['content'][:200] + '...' if len(row['content']) > 200 else row['content'], |
|
|
'full_content': row['content'], |
|
|
'token_start': row['token_start'], |
|
|
'token_end': row['token_end'], |
|
|
'center_token': row['center_token'], |
|
|
'entropy_score': row['entropy_score'], |
|
|
'radius': row['radius'], |
|
|
'adaptive_radius': bool(row['adaptive_radius']), |
|
|
'local_density': row['local_density'], |
|
|
'fallback': bool(row['fallback_chunk']), |
|
|
'similarity': similarity |
|
|
}) |
|
|
|
|
|
|
|
|
results.sort(key=lambda x: x['similarity'], reverse=True) |
|
|
results = results[:limit] |
|
|
|
|
|
search_time = time.time() - start_time |
|
|
|
|
|
|
|
|
self._log_message(f"Found {len(results)} results in {search_time:.3f}s:") |
|
|
self._log_message("-" * 80) |
|
|
|
|
|
for i, result in enumerate(results): |
|
|
chunk_type = "Fallback" if result['center_token'] == 'fallback' else "Radial" |
|
|
radius_info = f"radius: {result.get('radius', 'N/A')}" if 'radius' in result else "" |
|
|
density_info = f"density: {result.get('local_density', 0.0):.3f}" if 'local_density' in result else "" |
|
|
|
|
|
extra_info = [] |
|
|
if radius_info: |
|
|
extra_info.append(radius_info) |
|
|
if density_info: |
|
|
extra_info.append(density_info) |
|
|
if result.get('adaptive_radius', False): |
|
|
extra_info.append("adaptive") |
|
|
|
|
|
info_str = f" ({', '.join(extra_info)})" if extra_info else "" |
|
|
|
|
|
self._log_message(f"{i+1}. [{result['similarity']:.3f}] {chunk_type} Chunk {result['chunk_id']} (center: '{result['center_token']}', entropy: {result['entropy_score']:.2f}){info_str}") |
|
|
self._log_message(f" {result['content']}") |
|
|
self._log_message("") |
|
|
|
|
|
return results |
|
|
|
|
|
def get_entropy_stats(self) -> None: |
|
|
"""Get entropy analysis statistics. |
|
|
|
|
|
This method retrieves and displays statistics about the entropy-based |
|
|
chunking process, including document counts, entropy centers, and |
|
|
processing information. |
|
|
""" |
|
|
with self.get_connection() as conn: |
|
|
|
|
|
doc_stats = conn.execute("SELECT COUNT(*) as count FROM documents").fetchone() |
|
|
|
|
|
|
|
|
entropy_stats = conn.execute(""" |
|
|
SELECT filename, total_tokens, total_chunks, entropy_threshold, |
|
|
high_entropy_tokens, avg_chunk_size, processed_at |
|
|
FROM entropy_analysis |
|
|
ORDER BY processed_at DESC |
|
|
LIMIT 1 |
|
|
""").fetchone() |
|
|
|
|
|
|
|
|
top_centers = conn.execute(""" |
|
|
SELECT token_text, entropy_score, chunk_assigned |
|
|
FROM entropy_centers |
|
|
ORDER BY entropy_score DESC |
|
|
LIMIT 10 |
|
|
""").fetchall() |
|
|
|
|
|
self._log_message("Entropy-Based Search Statistics:") |
|
|
self._log_message(f" Total chunks: {doc_stats['count']:,}") |
|
|
|
|
|
if entropy_stats: |
|
|
self._log_message(f" File: {entropy_stats['filename']}") |
|
|
self._log_message(f" Total tokens: {entropy_stats['total_tokens']:,}") |
|
|
self._log_message(f" Entropy centers: {entropy_stats['high_entropy_tokens']:,}") |
|
|
self._log_message(f" Entropy threshold: {entropy_stats['entropy_threshold']:.3f}") |
|
|
self._log_message(f" Avg chunk size: {entropy_stats['avg_chunk_size']:.1f} tokens") |
|
|
self._log_message(f" Processed: {entropy_stats['processed_at']}") |
|
|
|
|
|
self._log_message(f"\nTop 10 High-Entropy Centers:") |
|
|
for i, center in enumerate(top_centers, 1): |
|
|
assigned = "✓" if center['chunk_assigned'] else "✗" |
|
|
self._log_message(f" {i:2d}. [{center['entropy_score']:.2f}] '{center['token_text']}' {assigned}") |
|
|
|
|
|
def main(): |
|
|
"""Main function to demonstrate the EntropyRadialSearch functionality. |
|
|
|
|
|
This function initializes the search engine, processes a sample file if |
|
|
available, and performs test searches to demonstrate the capabilities |
|
|
of the entropy-based radial chunking system. |
|
|
""" |
|
|
print("Entropy-Based Radial Search Engine") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
search_engine = EntropyRadialSearch() |
|
|
|
|
|
|
|
|
sam_file = "sam.txt" |
|
|
if os.path.exists(sam_file): |
|
|
print(f"Found {sam_file} ({os.path.getsize(sam_file)/1024/1024:.1f} MB)") |
|
|
|
|
|
|
|
|
search_engine.process_text_file(sam_file, entropy_percentile=99, chunk_radius=100, enable_adaptive=True) |
|
|
|
|
|
|
|
|
search_engine.get_entropy_stats() |
|
|
|
|
|
|
|
|
test_queries = [ |
|
|
"Steven Lamoreaux Casimir experiment", |
|
|
"Jahn Teller effect dynamic", |
|
|
"quantum mechanics entanglement", |
|
|
"Casimir force measurement 1997" |
|
|
] |
|
|
|
|
|
print(f"\nTesting entropy-based search:") |
|
|
print("-" * 50) |
|
|
|
|
|
for query in test_queries: |
|
|
search_engine.search(query, limit=3) |
|
|
print() |
|
|
|
|
|
print("Entropy-Based Search Engine Test Complete!") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |