import re import os from bs4 import BeautifulSoup import tiktoken class ChapterSplitter: """Split large chapters into smaller chunks while preserving structure""" def __init__(self, model_name="gpt-3.5-turbo", target_tokens=80000, compression_factor=1.0): """ Initialize splitter with token counter target_tokens: Target size for each chunk (leaving room for system prompt & history) compression_factor: Expected compression ratio from source to target language (0.7-1.0) """ try: self.enc = tiktoken.encoding_for_model(model_name) except: self.enc = tiktoken.get_encoding("cl100k_base") self.target_tokens = target_tokens self.compression_factor = compression_factor def count_tokens(self, text): """Count tokens in text (strips data URIs to avoid huge overestimates).""" try: # Remove/shorten base64 data URIs (e.g., data:image/png;base64,AAAA...) # They are not meaningful for token budgeting and can explode counts. text = re.sub(r'data:image/[^;]+;base64,[A-Za-z0-9+/=]+', 'data:image;base64,', text) return len(self.enc.encode(text)) except: # Fallback estimation return len(text) // 4 def split_chapter(self, chapter_html, max_tokens=None, filename=None): """ Split a chapter into smaller chunks. Splits based on EITHER token limit OR line break count (whichever comes first). Args: chapter_html: The chapter content (HTML or plain text) max_tokens: Maximum tokens per chunk filename: Optional filename to help determine content type Returns: List of (chunk_html, chunk_index, total_chunks) """ if max_tokens is None: max_tokens = self.target_tokens effective_max_tokens = max_tokens # Check for break split configuration (skip for PDF files) # Check both the filename parameter and if it looks like a path ending in .pdf # Also check if '.pdf' is anywhere in the filename (case-insensitive) to catch PDF chapters is_pdf_file = filename and (filename.lower().endswith('.pdf') or '.pdf' in filename.lower()) break_split = os.getenv('BREAK_SPLIT_COUNT', '') max_elements = None if break_split and break_split.isdigit() and not is_pdf_file: max_elements = int(break_split) print(f"✅ Break split enabled: {max_elements} per chunk") # First check if splitting is needed total_tokens = self.count_tokens(chapter_html) if os.getenv('DEBUG_CHUNK_SPLITTING', '0') == '1': print(f"[CHUNK DEBUG] Total tokens: {total_tokens:,}") print(f"[CHUNK DEBUG] Effective max tokens: {effective_max_tokens:,}") print(f"[CHUNK DEBUG] Max elements per chunk: {max_elements if max_elements else 'None (token-only)'}") print(f"[CHUNK DEBUG] Needs split: {total_tokens > effective_max_tokens}") if total_tokens <= effective_max_tokens and max_elements is None: return [(chapter_html, 1, 1)] # No split needed # Determine if content is plain text based on filename extension # Check this FIRST before any HTML parsing is_plain_text_file = False if filename: # Check if it's a known plain text extension is_plain_text_file = any(filename.lower().endswith(suffix) for suffix in ['.csv', '.json', '.txt']) if is_plain_text_file and max_elements and not is_pdf_file: print(f"📄 Detected plain text file format (forcing line-based splitting)") # If it's a plain text file, skip HTML parsing and go directly to line-based splitting if not is_plain_text_file: soup = BeautifulSoup(chapter_html, 'html.parser') if soup.body: elements = list(soup.body.children) else: elements = list(soup.children) # Check if we have actual HTML tags (not just text) # Count non-empty elements non_empty_elements = [elem for elem in elements if not (isinstance(elem, str) and elem.strip() == '')] has_html_tags = any(hasattr(elem, 'name') for elem in non_empty_elements) else: # For plain text files, set these to trigger line-based mode has_html_tags = False non_empty_elements = [] # Force plain text mode for .csv, .json, .txt files OR if no HTML tags OR only 1 element if is_plain_text_file or not has_html_tags or len(non_empty_elements) <= 1: # Plain text mode - split by line count OR token limit lines = chapter_html.split('\n') if max_elements and not is_pdf_file: print(f"📝 Total lines in file: {len(lines):,}") # Calculate tokens for all lines first for balanced splitting line_tokens = [self.count_tokens(line) for line in lines] # Calculate how many chunks we need for balanced distribution if max_elements: # Element-based splitting num_chunks = (len(lines) + max_elements - 1) // max_elements else: # Token-based splitting - calculate optimal number of chunks num_chunks = (total_tokens + effective_max_tokens - 1) // effective_max_tokens if num_chunks == 1: return [(chapter_html, 1, 1)] # Balanced splitting: distribute lines evenly across chunks chunks = [] target_tokens_per_chunk = total_tokens / num_chunks current_lines = [] current_tokens = 0 # Pre-compute marker indices for gender-context boundaries gender_footer_indices = { idx for idx, line in enumerate(lines) if line.strip().startswith("=== CONTEXT ") and "END ===" in line } # Build prefix sums for fast token range queries prefix_tokens = [0] for tok in line_tokens: prefix_tokens.append(prefix_tokens[-1] + tok) def tokens_between(start_idx, end_exclusive): return prefix_tokens[end_exclusive] - prefix_tokens[start_idx] chunk_start = 0 for i, line in enumerate(lines): line_tok = line_tokens[i] # Add line to current chunk current_lines.append(line) current_tokens += line_tok # Check if we should end this chunk is_last_line = (i == len(lines) - 1) chunks_remaining = num_chunks - len(chunks) lines_remaining = len(lines) - i - 1 # Split if: # 1. We've reached target tokens AND there are enough lines left for remaining chunks # 2. OR we've exceeded max tokens # 3. OR this is the last line should_split = False if current_tokens >= target_tokens_per_chunk and chunks_remaining > 1: # Make sure there are enough lines remaining for the remaining chunks if lines_remaining >= chunks_remaining - 1: should_split = True elif current_tokens > effective_max_tokens: should_split = True elif is_last_line: should_split = True if should_split: # Prefer to split near a gender footer that is closest to the token target prev_footer = max((idx for idx in gender_footer_indices if chunk_start <= idx <= i), default=None) next_footer = min((idx for idx in gender_footer_indices if idx > i), default=None) # Token counts if we snap to footer def tokens_to(idx_inclusive): return tokens_between(chunk_start, idx_inclusive + 1) # Candidate selection: pick footer with tokens closest to target, within 15% of hard cap if forward best_split_idx = None best_diff = None target = target_tokens_per_chunk if prev_footer is not None: t_prev = tokens_to(prev_footer) diff = abs(t_prev - target) best_split_idx, best_diff = prev_footer + 1, diff if next_footer is not None: t_next = tokens_to(next_footer) # Allow slight overflow beyond effective_max_tokens if t_next <= effective_max_tokens * 1.15: diff = abs(t_next - target) if best_diff is None or diff < best_diff: best_split_idx, best_diff = next_footer + 1, diff if best_split_idx is None: best_split_idx = i + 1 # fallback to current boundary chunk_lines = lines[chunk_start:best_split_idx] chunks.append('\n'.join(chunk_lines)) # Prepare next chunk state chunk_start = best_split_idx current_lines = lines[chunk_start:i + 1] current_tokens = tokens_between(chunk_start, i + 1) # Safety: add any remaining lines if current_lines: chunks.append('\n'.join(current_lines)) if not chunks: chunks = [chapter_html] total_chunks = len(chunks) return [(chunk, i+1, total_chunks) for i, chunk in enumerate(chunks)] # HTML mode - balanced split by element tokens (with optional element cap) # Count total elements first if Break Split is enabled (skip for PDFs) if max_elements and not is_pdf_file: total_elements = sum(1 for elem in elements if not (isinstance(elem, str) and elem.strip() == '')) print(f"🏷️ Total HTML elements in file: {total_elements:,}") # Pre-compute token counts for non-empty elements elem_html_list = [] elem_tokens_list = [] for element in elements: if isinstance(element, str) and element.strip() == '': continue element_html = str(element) tokens = self.count_tokens(element_html) elem_html_list.append(element_html) elem_tokens_list.append(tokens) if not elem_html_list: return [(chapter_html, 1, 1)] total_elem_tokens = sum(elem_tokens_list) # Determine number of chunks if max_elements: num_chunks = (len(elem_html_list) + max_elements - 1) // max_elements else: num_chunks = (total_elem_tokens + effective_max_tokens - 1) // effective_max_tokens num_chunks = max(1, num_chunks) if num_chunks == 1: return [(chapter_html, 1, 1)] target_tokens_per_chunk = total_elem_tokens / num_chunks chunks = [] current_chunk_elements = [] current_chunk_tokens = 0 for i, element_html in enumerate(elem_html_list): element_tokens = elem_tokens_list[i] # Oversized single element: make it a standalone chunk if element_tokens > effective_max_tokens: if current_chunk_elements: chunks.append(self._create_chunk_html(current_chunk_elements)) current_chunk_elements = [] current_chunk_tokens = 0 chunks.append(element_html) continue current_chunk_elements.append(element_html) current_chunk_tokens += element_tokens is_last = (i == len(elem_html_list) - 1) chunks_remaining = num_chunks - len(chunks) elems_remaining = len(elem_html_list) - i - 1 should_split = False # Prefer splitting when we reach target and enough elements remain if current_chunk_tokens >= target_tokens_per_chunk and chunks_remaining > 1: if elems_remaining >= (chunks_remaining - 1): should_split = True # Hard cap by tokens or element count if current_chunk_tokens > effective_max_tokens: should_split = True if max_elements and len(current_chunk_elements) >= max_elements: should_split = True if is_last: should_split = True if should_split: chunks.append(self._create_chunk_html(current_chunk_elements)) current_chunk_elements = [] current_chunk_tokens = 0 if current_chunk_elements: chunks.append(self._create_chunk_html(current_chunk_elements)) if not chunks: chunks = [chapter_html] if os.getenv('DEBUG_CHUNK_SPLITTING', '0') == '1': print(f"[CHUNK DEBUG] Created {len(chunks)} chunks") for idx, chunk in enumerate(chunks, 1): chunk_tokens = self.count_tokens(chunk) print(f"[CHUNK DEBUG] Chunk {idx}: {chunk_tokens:,} tokens ({len(chunk):,} chars)") total_chunks = len(chunks) return [(chunk, i+1, total_chunks) for i, chunk in enumerate(chunks)] def _split_large_element(self, element, max_tokens): """Split a single large element (like a long paragraph)""" chunks = [] if element.name == 'p' or not hasattr(element, 'children'): # For paragraphs or text elements, split by sentences text = element.get_text() sentences = re.split(r'(?<=[.!?])\s+', text) current_chunk = [] current_tokens = 0 for sentence in sentences: sentence_tokens = self.count_tokens(sentence) if current_tokens + sentence_tokens > max_tokens * 0.8 and current_chunk: # Create paragraph with current sentences chunk_text = ' '.join(current_chunk) chunks.append(f"

{chunk_text}

") current_chunk = [sentence] current_tokens = sentence_tokens else: current_chunk.append(sentence) current_tokens += sentence_tokens if current_chunk: chunk_text = ' '.join(current_chunk) chunks.append(f"

{chunk_text}

") else: # For other elements, try to split by children children = list(element.children) current_chunk = [] current_tokens = 0 for child in children: child_html = str(child) child_tokens = self.count_tokens(child_html) if current_tokens + child_tokens > max_tokens * 0.8 and current_chunk: # Wrap in parent element type wrapper = BeautifulSoup(f"<{element.name}>", 'html.parser') wrapper_elem = wrapper.find(element.name) for item in current_chunk: wrapper_elem.append(BeautifulSoup(item, 'html.parser')) chunks.append(str(wrapper)) current_chunk = [child_html] current_tokens = child_tokens else: current_chunk.append(child_html) current_tokens += child_tokens if current_chunk: wrapper = BeautifulSoup(f"<{element.name}>", 'html.parser') wrapper_elem = wrapper.find(element.name) for item in current_chunk: wrapper_elem.append(BeautifulSoup(item, 'html.parser')) chunks.append(str(wrapper)) return chunks def _create_chunk_html(self, elements): """Create a valid HTML chunk from list of elements""" # Join elements and wrap in basic HTML structure if needed content = '\n'.join(elements) # Check if it already has body tags if ' 1: # Keep first body, move all content from others into it main_body = bodies[0] for extra_body in bodies[1:]: for child in list(extra_body.children): main_body.append(child) extra_body.decompose() return str(soup) return merged