Spaces:
Running
Running
File size: 18,845 Bytes
457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd ec038f4 457b8fd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 | import re
import os
from bs4 import BeautifulSoup
import tiktoken
class ChapterSplitter:
"""Split large chapters into smaller chunks while preserving structure"""
def __init__(self, model_name="gpt-3.5-turbo", target_tokens=80000, compression_factor=1.0):
"""
Initialize splitter with token counter
target_tokens: Target size for each chunk (leaving room for system prompt & history)
compression_factor: Expected compression ratio from source to target language (0.7-1.0)
"""
try:
self.enc = tiktoken.encoding_for_model(model_name)
except:
self.enc = tiktoken.get_encoding("cl100k_base")
self.target_tokens = target_tokens
self.compression_factor = compression_factor
def count_tokens(self, text):
"""Count tokens in text (strips data URIs to avoid huge overestimates)."""
try:
# Remove/shorten base64 data URIs (e.g., data:image/png;base64,AAAA...)
# They are not meaningful for token budgeting and can explode counts.
text = re.sub(r'data:image/[^;]+;base64,[A-Za-z0-9+/=]+', 'data:image;base64,', text)
return len(self.enc.encode(text))
except:
# Fallback estimation
return len(text) // 4
def split_chapter(self, chapter_html, max_tokens=None, filename=None):
"""
Split a chapter into smaller chunks.
Splits based on EITHER token limit OR line break count (whichever comes first).
Args:
chapter_html: The chapter content (HTML or plain text)
max_tokens: Maximum tokens per chunk
filename: Optional filename to help determine content type
Returns: List of (chunk_html, chunk_index, total_chunks)
"""
if max_tokens is None:
max_tokens = self.target_tokens
effective_max_tokens = max_tokens
# Check for break split configuration (skip for PDF files)
# Check both the filename parameter and if it looks like a path ending in .pdf
# Also check if '.pdf' is anywhere in the filename (case-insensitive) to catch PDF chapters
is_pdf_file = filename and (filename.lower().endswith('.pdf') or '.pdf' in filename.lower())
break_split = os.getenv('BREAK_SPLIT_COUNT', '')
max_elements = None
if break_split and break_split.isdigit() and not is_pdf_file:
max_elements = int(break_split)
print(f"✅ Break split enabled: {max_elements} per chunk")
# First check if splitting is needed
total_tokens = self.count_tokens(chapter_html)
if os.getenv('DEBUG_CHUNK_SPLITTING', '0') == '1':
print(f"[CHUNK DEBUG] Total tokens: {total_tokens:,}")
print(f"[CHUNK DEBUG] Effective max tokens: {effective_max_tokens:,}")
print(f"[CHUNK DEBUG] Max elements per chunk: {max_elements if max_elements else 'None (token-only)'}")
print(f"[CHUNK DEBUG] Needs split: {total_tokens > effective_max_tokens}")
if total_tokens <= effective_max_tokens and max_elements is None:
return [(chapter_html, 1, 1)] # No split needed
# Determine if content is plain text based on filename extension
# Check this FIRST before any HTML parsing
is_plain_text_file = False
if filename:
# Check if it's a known plain text extension
is_plain_text_file = any(filename.lower().endswith(suffix) for suffix in ['.csv', '.json', '.txt'])
if is_plain_text_file and max_elements and not is_pdf_file:
print(f"📄 Detected plain text file format (forcing line-based splitting)")
# If it's a plain text file, skip HTML parsing and go directly to line-based splitting
if not is_plain_text_file:
soup = BeautifulSoup(chapter_html, 'html.parser')
if soup.body:
elements = list(soup.body.children)
else:
elements = list(soup.children)
# Check if we have actual HTML tags (not just text)
# Count non-empty elements
non_empty_elements = [elem for elem in elements if not (isinstance(elem, str) and elem.strip() == '')]
has_html_tags = any(hasattr(elem, 'name') for elem in non_empty_elements)
else:
# For plain text files, set these to trigger line-based mode
has_html_tags = False
non_empty_elements = []
# Force plain text mode for .csv, .json, .txt files OR if no HTML tags OR only 1 element
if is_plain_text_file or not has_html_tags or len(non_empty_elements) <= 1:
# Plain text mode - split by line count OR token limit
lines = chapter_html.split('\n')
if max_elements and not is_pdf_file:
print(f"📝 Total lines in file: {len(lines):,}")
# Calculate tokens for all lines first for balanced splitting
line_tokens = [self.count_tokens(line) for line in lines]
# Calculate how many chunks we need for balanced distribution
if max_elements:
# Element-based splitting
num_chunks = (len(lines) + max_elements - 1) // max_elements
else:
# Token-based splitting - calculate optimal number of chunks
num_chunks = (total_tokens + effective_max_tokens - 1) // effective_max_tokens
if num_chunks == 1:
return [(chapter_html, 1, 1)]
# Balanced splitting: distribute lines evenly across chunks
chunks = []
target_tokens_per_chunk = total_tokens / num_chunks
current_lines = []
current_tokens = 0
# Pre-compute marker indices for gender-context boundaries
gender_footer_indices = {
idx for idx, line in enumerate(lines)
if line.strip().startswith("=== CONTEXT ") and "END ===" in line
}
# Build prefix sums for fast token range queries
prefix_tokens = [0]
for tok in line_tokens:
prefix_tokens.append(prefix_tokens[-1] + tok)
def tokens_between(start_idx, end_exclusive):
return prefix_tokens[end_exclusive] - prefix_tokens[start_idx]
chunk_start = 0
for i, line in enumerate(lines):
line_tok = line_tokens[i]
# Add line to current chunk
current_lines.append(line)
current_tokens += line_tok
# Check if we should end this chunk
is_last_line = (i == len(lines) - 1)
chunks_remaining = num_chunks - len(chunks)
lines_remaining = len(lines) - i - 1
# Split if:
# 1. We've reached target tokens AND there are enough lines left for remaining chunks
# 2. OR we've exceeded max tokens
# 3. OR this is the last line
should_split = False
if current_tokens >= target_tokens_per_chunk and chunks_remaining > 1:
# Make sure there are enough lines remaining for the remaining chunks
if lines_remaining >= chunks_remaining - 1:
should_split = True
elif current_tokens > effective_max_tokens:
should_split = True
elif is_last_line:
should_split = True
if should_split:
# Prefer to split near a gender footer that is closest to the token target
prev_footer = max((idx for idx in gender_footer_indices if chunk_start <= idx <= i), default=None)
next_footer = min((idx for idx in gender_footer_indices if idx > i), default=None)
# Token counts if we snap to footer
def tokens_to(idx_inclusive):
return tokens_between(chunk_start, idx_inclusive + 1)
# Candidate selection: pick footer with tokens closest to target, within 15% of hard cap if forward
best_split_idx = None
best_diff = None
target = target_tokens_per_chunk
if prev_footer is not None:
t_prev = tokens_to(prev_footer)
diff = abs(t_prev - target)
best_split_idx, best_diff = prev_footer + 1, diff
if next_footer is not None:
t_next = tokens_to(next_footer)
# Allow slight overflow beyond effective_max_tokens
if t_next <= effective_max_tokens * 1.15:
diff = abs(t_next - target)
if best_diff is None or diff < best_diff:
best_split_idx, best_diff = next_footer + 1, diff
if best_split_idx is None:
best_split_idx = i + 1 # fallback to current boundary
chunk_lines = lines[chunk_start:best_split_idx]
chunks.append('\n'.join(chunk_lines))
# Prepare next chunk state
chunk_start = best_split_idx
current_lines = lines[chunk_start:i + 1]
current_tokens = tokens_between(chunk_start, i + 1)
# Safety: add any remaining lines
if current_lines:
chunks.append('\n'.join(current_lines))
if not chunks:
chunks = [chapter_html]
total_chunks = len(chunks)
return [(chunk, i+1, total_chunks) for i, chunk in enumerate(chunks)]
# HTML mode - balanced split by element tokens (with optional element cap)
# Count total elements first if Break Split is enabled (skip for PDFs)
if max_elements and not is_pdf_file:
total_elements = sum(1 for elem in elements if not (isinstance(elem, str) and elem.strip() == ''))
print(f"🏷️ Total HTML elements in file: {total_elements:,}")
# Pre-compute token counts for non-empty elements
elem_html_list = []
elem_tokens_list = []
for element in elements:
if isinstance(element, str) and element.strip() == '':
continue
element_html = str(element)
tokens = self.count_tokens(element_html)
elem_html_list.append(element_html)
elem_tokens_list.append(tokens)
if not elem_html_list:
return [(chapter_html, 1, 1)]
total_elem_tokens = sum(elem_tokens_list)
# Determine number of chunks
if max_elements:
num_chunks = (len(elem_html_list) + max_elements - 1) // max_elements
else:
num_chunks = (total_elem_tokens + effective_max_tokens - 1) // effective_max_tokens
num_chunks = max(1, num_chunks)
if num_chunks == 1:
return [(chapter_html, 1, 1)]
target_tokens_per_chunk = total_elem_tokens / num_chunks
chunks = []
current_chunk_elements = []
current_chunk_tokens = 0
for i, element_html in enumerate(elem_html_list):
element_tokens = elem_tokens_list[i]
# Oversized single element: make it a standalone chunk
if element_tokens > effective_max_tokens:
if current_chunk_elements:
chunks.append(self._create_chunk_html(current_chunk_elements))
current_chunk_elements = []
current_chunk_tokens = 0
chunks.append(element_html)
continue
current_chunk_elements.append(element_html)
current_chunk_tokens += element_tokens
is_last = (i == len(elem_html_list) - 1)
chunks_remaining = num_chunks - len(chunks)
elems_remaining = len(elem_html_list) - i - 1
should_split = False
# Prefer splitting when we reach target and enough elements remain
if current_chunk_tokens >= target_tokens_per_chunk and chunks_remaining > 1:
if elems_remaining >= (chunks_remaining - 1):
should_split = True
# Hard cap by tokens or element count
if current_chunk_tokens > effective_max_tokens:
should_split = True
if max_elements and len(current_chunk_elements) >= max_elements:
should_split = True
if is_last:
should_split = True
if should_split:
chunks.append(self._create_chunk_html(current_chunk_elements))
current_chunk_elements = []
current_chunk_tokens = 0
if current_chunk_elements:
chunks.append(self._create_chunk_html(current_chunk_elements))
if not chunks:
chunks = [chapter_html]
if os.getenv('DEBUG_CHUNK_SPLITTING', '0') == '1':
print(f"[CHUNK DEBUG] Created {len(chunks)} chunks")
for idx, chunk in enumerate(chunks, 1):
chunk_tokens = self.count_tokens(chunk)
print(f"[CHUNK DEBUG] Chunk {idx}: {chunk_tokens:,} tokens ({len(chunk):,} chars)")
total_chunks = len(chunks)
return [(chunk, i+1, total_chunks) for i, chunk in enumerate(chunks)]
def _split_large_element(self, element, max_tokens):
"""Split a single large element (like a long paragraph)"""
chunks = []
if element.name == 'p' or not hasattr(element, 'children'):
# For paragraphs or text elements, split by sentences
text = element.get_text()
sentences = re.split(r'(?<=[.!?])\s+', text)
current_chunk = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = self.count_tokens(sentence)
if current_tokens + sentence_tokens > max_tokens * 0.8 and current_chunk:
# Create paragraph with current sentences
chunk_text = ' '.join(current_chunk)
chunks.append(f"<p>{chunk_text}</p>")
current_chunk = [sentence]
current_tokens = sentence_tokens
else:
current_chunk.append(sentence)
current_tokens += sentence_tokens
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append(f"<p>{chunk_text}</p>")
else:
# For other elements, try to split by children
children = list(element.children)
current_chunk = []
current_tokens = 0
for child in children:
child_html = str(child)
child_tokens = self.count_tokens(child_html)
if current_tokens + child_tokens > max_tokens * 0.8 and current_chunk:
# Wrap in parent element type
wrapper = BeautifulSoup(f"<{element.name}></{element.name}>", 'html.parser')
wrapper_elem = wrapper.find(element.name)
for item in current_chunk:
wrapper_elem.append(BeautifulSoup(item, 'html.parser'))
chunks.append(str(wrapper))
current_chunk = [child_html]
current_tokens = child_tokens
else:
current_chunk.append(child_html)
current_tokens += child_tokens
if current_chunk:
wrapper = BeautifulSoup(f"<{element.name}></{element.name}>", 'html.parser')
wrapper_elem = wrapper.find(element.name)
for item in current_chunk:
wrapper_elem.append(BeautifulSoup(item, 'html.parser'))
chunks.append(str(wrapper))
return chunks
def _create_chunk_html(self, elements):
"""Create a valid HTML chunk from list of elements"""
# Join elements and wrap in basic HTML structure if needed
content = '\n'.join(elements)
# Check if it already has body tags
if '<body' not in content.lower():
# Just return the content, let the translation handle it
return content
else:
return content
def merge_translated_chunks(self, translated_chunks):
"""
Merge translated chunks back together
translated_chunks: List of (translated_html, chunk_index, total_chunks)
"""
# Sort by chunk index to ensure correct order
sorted_chunks = sorted(translated_chunks, key=lambda x: x[1])
# Extract just the HTML content
html_parts = [chunk[0] for chunk in sorted_chunks]
# Simply concatenate - the chunks should maintain structure
merged = '\n'.join(html_parts)
# Clean up any duplicate body tags if they exist
soup = BeautifulSoup(merged, 'html.parser')
# If multiple body tags, merge their contents
bodies = soup.find_all('body')
if len(bodies) > 1:
# Keep first body, move all content from others into it
main_body = bodies[0]
for extra_body in bodies[1:]:
for child in list(extra_body.children):
main_body.append(child)
extra_body.decompose()
return str(soup)
return merged
|