Spaces:
Sleeping
Sleeping
| """ | |
| Doctra - Document Parser for Hugging Face Spaces | |
| This is a Hugging Face Spaces deployment of the Doctra document parsing library. | |
| It provides a comprehensive web interface for PDF parsing, table/chart extraction, | |
| image restoration, and enhanced document processing. | |
| """ | |
| import os | |
| import shutil | |
| import tempfile | |
| import re | |
| import html as _html | |
| import base64 | |
| import json | |
| from pathlib import Path | |
| from typing import Optional, Tuple, List, Dict, Any | |
| import gradio as gr | |
| import pandas as pd | |
| # Mock google.genai to avoid import errors | |
| import sys | |
| from unittest.mock import MagicMock | |
| # Create a mock google.genai module | |
| mock_google_genai = MagicMock() | |
| sys.modules['google.genai'] = mock_google_genai | |
| sys.modules['google.genai.types'] = MagicMock() | |
| # Now import Doctra components | |
| try: | |
| from doctra.parsers.structured_pdf_parser import StructuredPDFParser | |
| from doctra.parsers.table_chart_extractor import ChartTablePDFParser | |
| from doctra.parsers.enhanced_pdf_parser import EnhancedPDFParser | |
| from doctra.ui.docres_wrapper import DocResUIWrapper | |
| from doctra.utils.pdf_io import render_pdf_to_images | |
| except ImportError as e: | |
| print(f"Warning: Some Doctra components may not be available: {e}") | |
| # Create mock classes if imports fail | |
| StructuredPDFParser = None | |
| ChartTablePDFParser = None | |
| EnhancedPDFParser = None | |
| DocResUIWrapper = None | |
| render_pdf_to_images = None | |
| # UI Theme and Styling Constants | |
| THEME = gr.themes.Soft(primary_hue="indigo", neutral_hue="slate") | |
| CUSTOM_CSS = """ | |
| /* Full-width layout */ | |
| .gradio-container {max-width: 100% !important; padding-left: 24px; padding-right: 24px} | |
| .container {max-width: 100% !important} | |
| .app {max-width: 100% !important} | |
| /* Header and helpers */ | |
| .header {margin-bottom: 8px} | |
| .subtitle {color: var(--body-text-color-subdued)} | |
| .card {border:1px solid var(--border-color); border-radius:12px; padding:8px} | |
| .status-ok {color: var(--color-success)} | |
| /* Scrollable gallery styling */ | |
| .scrollable-gallery { | |
| max-height: 600px !important; | |
| overflow-y: auto !important; | |
| border: 1px solid var(--border-color) !important; | |
| border-radius: 8px !important; | |
| padding: 8px !important; | |
| } | |
| /* Page content styling */ | |
| .page-content img { | |
| max-width: 100% !important; | |
| height: auto !important; | |
| display: block !important; | |
| margin: 10px auto !important; | |
| border: 1px solid #ddd !important; | |
| border-radius: 8px !important; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1) !important; | |
| } | |
| .page-content { | |
| max-height: none !important; | |
| overflow: visible !important; | |
| } | |
| /* Table styling */ | |
| .page-content table.doc-table { | |
| width: 100% !important; | |
| border-collapse: collapse !important; | |
| margin: 12px 0 !important; | |
| } | |
| .page-content table.doc-table th, | |
| .page-content table.doc-table td { | |
| border: 1px solid #e5e7eb !important; | |
| padding: 8px 10px !important; | |
| text-align: left !important; | |
| } | |
| .page-content table.doc-table thead th { | |
| background: #f9fafb !important; | |
| font-weight: 600 !important; | |
| } | |
| .page-content table.doc-table tbody tr:nth-child(even) td { | |
| background: #fafafa !important; | |
| } | |
| /* Clickable image buttons */ | |
| .image-button { | |
| background: #0066cc !important; | |
| color: white !important; | |
| border: none !important; | |
| padding: 5px 10px !important; | |
| border-radius: 4px !important; | |
| cursor: pointer !important; | |
| margin: 2px !important; | |
| font-size: 14px !important; | |
| } | |
| .image-button:hover { | |
| background: #0052a3 !important; | |
| } | |
| """ | |
| def gather_outputs( | |
| out_dir: Path, | |
| allowed_kinds: Optional[List[str]] = None, | |
| zip_filename: Optional[str] = None, | |
| is_structured_parsing: bool = False | |
| ) -> Tuple[List[tuple[str, str]], List[str], str]: | |
| """ | |
| Gather output files and create a ZIP archive for download. | |
| """ | |
| gallery_items: List[tuple[str, str]] = [] | |
| file_paths: List[str] = [] | |
| if out_dir.exists(): | |
| if is_structured_parsing: | |
| # For structured parsing, include all files | |
| for file_path in sorted(out_dir.rglob("*")): | |
| if file_path.is_file(): | |
| file_paths.append(str(file_path)) | |
| else: | |
| # For full parsing, include specific main files | |
| main_files = [ | |
| "result.html", | |
| "result.md", | |
| "tables.html", | |
| "tables.xlsx" | |
| ] | |
| for main_file in main_files: | |
| file_path = out_dir / main_file | |
| if file_path.exists(): | |
| file_paths.append(str(file_path)) | |
| # Include images based on allowed kinds | |
| if allowed_kinds: | |
| for kind in allowed_kinds: | |
| p = out_dir / kind | |
| if p.exists(): | |
| for img in sorted(p.glob("*.png")): | |
| file_paths.append(str(img)) | |
| images_dir = out_dir / "images" / kind | |
| if images_dir.exists(): | |
| for img in sorted(images_dir.glob("*.jpg")): | |
| file_paths.append(str(img)) | |
| else: | |
| # Include all images if no specific kinds specified | |
| for p in (out_dir / "charts").glob("*.png"): | |
| file_paths.append(str(p)) | |
| for p in (out_dir / "tables").glob("*.png"): | |
| file_paths.append(str(p)) | |
| for p in (out_dir / "images").rglob("*.jpg"): | |
| file_paths.append(str(p)) | |
| # Include Excel files based on allowed kinds | |
| if allowed_kinds: | |
| if "charts" in allowed_kinds and "tables" in allowed_kinds: | |
| excel_files = ["parsed_tables_charts.xlsx"] | |
| elif "charts" in allowed_kinds: | |
| excel_files = ["parsed_charts.xlsx"] | |
| elif "tables" in allowed_kinds: | |
| excel_files = ["parsed_tables.xlsx"] | |
| else: | |
| excel_files = [] | |
| for excel_file in excel_files: | |
| excel_path = out_dir / excel_file | |
| if excel_path.exists(): | |
| file_paths.append(str(excel_path)) | |
| # Build gallery items for image display | |
| kinds = allowed_kinds if allowed_kinds else ["tables", "charts", "figures"] | |
| for sub in kinds: | |
| p = out_dir / sub | |
| if p.exists(): | |
| for img in sorted(p.glob("*.png")): | |
| gallery_items.append((str(img), f"{sub}: {img.name}")) | |
| images_dir = out_dir / "images" / sub | |
| if images_dir.exists(): | |
| for img in sorted(images_dir.glob("*.jpg")): | |
| gallery_items.append((str(img), f"{sub}: {img.name}")) | |
| # Create ZIP archive | |
| tmp_zip_dir = Path(tempfile.mkdtemp(prefix="doctra_zip_")) | |
| if zip_filename: | |
| safe_filename = re.sub(r'[<>:"/\\|?*]', '_', zip_filename) | |
| zip_base = tmp_zip_dir / safe_filename | |
| else: | |
| zip_base = tmp_zip_dir / "doctra_outputs" | |
| filtered_dir = tmp_zip_dir / "filtered_outputs" | |
| shutil.copytree(out_dir, filtered_dir, ignore=shutil.ignore_patterns('~$*', '*.tmp', '*.temp')) | |
| zip_path = shutil.make_archive(str(zip_base), 'zip', root_dir=str(filtered_dir)) | |
| return gallery_items, file_paths, zip_path | |
| def validate_vlm_config(use_vlm: bool, vlm_api_key: str, vlm_provider: str = "gemini") -> Optional[str]: | |
| """ | |
| Validate VLM configuration parameters. | |
| """ | |
| if use_vlm and vlm_provider not in ["ollama"] and not vlm_api_key: | |
| return "β Error: VLM API key is required when using VLM (except for Ollama)" | |
| if use_vlm and vlm_api_key and vlm_provider not in ["ollama"]: | |
| # Basic API key validation | |
| if len(vlm_api_key.strip()) < 10: | |
| return "β Error: VLM API key appears to be too short or invalid" | |
| if vlm_api_key.strip().startswith('sk-') and len(vlm_api_key.strip()) < 20: | |
| return "β Error: OpenAI API key appears to be invalid (too short)" | |
| return None | |
| def create_page_html_content(page_content: List[str], base_dir: Optional[Path] = None) -> str: | |
| """ | |
| Convert page content lines to HTML with inline images and proper formatting. | |
| """ | |
| processed_content = [] | |
| paragraph_buffer = [] | |
| def flush_paragraph(): | |
| """Flush accumulated paragraph content to HTML""" | |
| nonlocal paragraph_buffer | |
| if paragraph_buffer: | |
| joined = '<br/>'.join(_html.escape(l) for l in paragraph_buffer) | |
| processed_content.append(f'<p>{joined}</p>') | |
| paragraph_buffer = [] | |
| def is_markdown_table_header(s: str) -> bool: | |
| return '|' in s and ('---' in s or 'β' in s) | |
| def render_markdown_table(lines: List[str]) -> str: | |
| rows = [l.strip().strip('|').split('|') for l in lines] | |
| rows = [[_html.escape(c.strip()) for c in r] for r in rows] | |
| if len(rows) < 2: | |
| return "" | |
| header = rows[0] | |
| body = rows[2:] if len(rows) > 2 else [] | |
| thead = '<thead><tr>' + ''.join(f'<th>{c}</th>' for c in header) + '</tr></thead>' | |
| tbody = '<tbody>' + ''.join('<tr>' + ''.join(f'<td>{c}</td>' for c in r) + '</tr>' for r in body) + '</tbody>' | |
| return f'<table class="doc-table">{thead}{tbody}</table>' | |
| i = 0 | |
| n = len(page_content) | |
| while i < n: | |
| raw_line = page_content[i] | |
| line = raw_line.rstrip('\r\n') | |
| stripped = line.strip() | |
| # Handle image references | |
| if stripped.startswith(': | |
| flush_paragraph() | |
| match = re.match(r'!\[([^\]]+)\]\(([^)]+)\)', stripped) | |
| if match and base_dir is not None: | |
| caption = match.group(1) | |
| rel_path = match.group(2).replace('\\\\', '/').replace('\\', '/').lstrip('/') | |
| abs_path = (base_dir / rel_path).resolve() | |
| try: | |
| with open(abs_path, 'rb') as f: | |
| b64 = base64.b64encode(f.read()).decode('ascii') | |
| processed_content.append(f'<figure><img src="data:image/jpeg;base64,{b64}" alt="{_html.escape(caption)}"/><figcaption>{_html.escape(caption)}</figcaption></figure>') | |
| except Exception as e: | |
| print(f"β Failed to embed image {rel_path}: {e}") | |
| processed_content.append(f'<div>{_html.escape(caption)} (image not found)</div>') | |
| else: | |
| processed_content.append(f'<div>{_html.escape(stripped)}</div>') | |
| i += 1 | |
| continue | |
| # Handle markdown tables | |
| if (stripped.startswith('|') or stripped.count('|') >= 2) and i + 1 < n and is_markdown_table_header(page_content[i + 1]): | |
| flush_paragraph() | |
| table_block = [stripped] | |
| i += 1 | |
| table_block.append(page_content[i].strip()) | |
| i += 1 | |
| while i < n: | |
| nxt = page_content[i].rstrip('\r\n') | |
| if nxt.strip() == '' or (not nxt.strip().startswith('|') and nxt.count('|') < 2): | |
| break | |
| table_block.append(nxt.strip()) | |
| i += 1 | |
| html_table = render_markdown_table(table_block) | |
| if html_table: | |
| processed_content.append(html_table) | |
| else: | |
| for tl in table_block: | |
| paragraph_buffer.append(tl) | |
| continue | |
| # Handle headers and content | |
| if stripped.startswith('## '): | |
| flush_paragraph() | |
| processed_content.append(f'<h3>{_html.escape(stripped[3:])}</h3>') | |
| elif stripped.startswith('# '): | |
| flush_paragraph() | |
| processed_content.append(f'<h2>{_html.escape(stripped[2:])}</h2>') | |
| elif stripped == '': | |
| flush_paragraph() | |
| processed_content.append('<br/>') | |
| else: | |
| paragraph_buffer.append(raw_line) | |
| i += 1 | |
| flush_paragraph() | |
| return "\n".join(processed_content) | |
| def run_full_parse( | |
| pdf_file: str, | |
| use_vlm: bool, | |
| vlm_provider: str, | |
| vlm_api_key: str, | |
| layout_model_name: str, | |
| dpi: int, | |
| min_score: float, | |
| ocr_lang: str, | |
| ocr_psm: int, | |
| ocr_oem: int, | |
| ocr_extra_config: str, | |
| box_separator: str, | |
| ) -> Tuple[str, Optional[str], List[tuple[str, str]], List[str], str]: | |
| """Run full PDF parsing with structured output.""" | |
| if not pdf_file: | |
| return ("No file provided.", None, [], [], "") | |
| # Check if Doctra components are available | |
| if StructuredPDFParser is None: | |
| return ("β Error: Doctra library not properly installed. Please check the requirements.", None, [], [], "") | |
| # Validate VLM configuration | |
| vlm_error = validate_vlm_config(use_vlm, vlm_api_key, vlm_provider) | |
| if vlm_error: | |
| return (vlm_error, None, [], [], "") | |
| original_filename = Path(pdf_file).stem | |
| # Create temporary directory for processing | |
| tmp_dir = Path(tempfile.mkdtemp(prefix="doctra_")) | |
| input_pdf = tmp_dir / f"{original_filename}.pdf" | |
| shutil.copy2(pdf_file, input_pdf) | |
| # Initialize parser with configuration | |
| parser = StructuredPDFParser( | |
| use_vlm=use_vlm, | |
| vlm_provider=vlm_provider, | |
| vlm_api_key=vlm_api_key or None, | |
| layout_model_name=layout_model_name, | |
| dpi=int(dpi), | |
| min_score=float(min_score), | |
| ocr_lang=ocr_lang, | |
| ocr_psm=int(ocr_psm), | |
| ocr_oem=int(ocr_oem), | |
| ocr_extra_config=ocr_extra_config or "", | |
| box_separator=box_separator or "\n", | |
| ) | |
| try: | |
| parser.parse(str(input_pdf)) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| try: | |
| error_msg = str(e).encode('utf-8', errors='replace').decode('utf-8') | |
| return (f"β VLM processing failed: {error_msg}", None, [], [], "") | |
| except Exception: | |
| return (f"β VLM processing failed: <Unicode encoding error>", None, [], [], "") | |
| # Find output directory | |
| outputs_root = Path("outputs") | |
| out_dir = outputs_root / original_filename / "full_parse" | |
| if not out_dir.exists(): | |
| candidates = sorted(outputs_root.glob("*/"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| if candidates: | |
| out_dir = candidates[0] / "full_parse" | |
| else: | |
| out_dir = outputs_root | |
| # Read markdown file if it exists | |
| md_file = next(out_dir.glob("*.md"), None) | |
| md_preview = None | |
| if md_file and md_file.exists(): | |
| try: | |
| with md_file.open("r", encoding="utf-8", errors="ignore") as f: | |
| md_preview = f.read() | |
| except Exception: | |
| md_preview = None | |
| # Gather output files and create ZIP | |
| gallery_items, file_paths, zip_path = gather_outputs( | |
| out_dir, | |
| zip_filename=original_filename, | |
| is_structured_parsing=False | |
| ) | |
| return ( | |
| f"β Parsing completed successfully!\nπ Output directory: {out_dir}", | |
| md_preview, | |
| gallery_items, | |
| file_paths, | |
| zip_path | |
| ) | |
| def run_extract( | |
| pdf_file: str, | |
| target: str, | |
| use_vlm: bool, | |
| vlm_provider: str, | |
| vlm_api_key: str, | |
| layout_model_name: str, | |
| dpi: int, | |
| min_score: float, | |
| ) -> Tuple[str, str, List[tuple[str, str]], List[str], str]: | |
| """Run table/chart extraction from PDF.""" | |
| if not pdf_file: | |
| return ("No file provided.", "", [], [], "") | |
| # Check if Doctra components are available | |
| if ChartTablePDFParser is None: | |
| return ("β Error: Doctra library not properly installed. Please check the requirements.", "", [], [], "") | |
| # Validate VLM configuration | |
| vlm_error = validate_vlm_config(use_vlm, vlm_api_key, vlm_provider) | |
| if vlm_error: | |
| return (vlm_error, "", [], [], "") | |
| original_filename = Path(pdf_file).stem | |
| # Create temporary directory for processing | |
| tmp_dir = Path(tempfile.mkdtemp(prefix="doctra_")) | |
| input_pdf = tmp_dir / f"{original_filename}.pdf" | |
| shutil.copy2(pdf_file, input_pdf) | |
| # Initialize parser with configuration | |
| parser = ChartTablePDFParser( | |
| extract_charts=(target in ("charts", "both")), | |
| extract_tables=(target in ("tables", "both")), | |
| use_vlm=use_vlm, | |
| vlm_provider=vlm_provider, | |
| vlm_api_key=vlm_api_key or None, | |
| layout_model_name=layout_model_name, | |
| dpi=int(dpi), | |
| min_score=float(min_score), | |
| ) | |
| # Run extraction | |
| output_base = Path("outputs") | |
| parser.parse(str(input_pdf), str(output_base)) | |
| # Find output directory | |
| outputs_root = output_base | |
| out_dir = outputs_root / original_filename / "structured_parsing" | |
| if not out_dir.exists(): | |
| if outputs_root.exists(): | |
| candidates = sorted(outputs_root.glob("*/"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| if candidates: | |
| out_dir = candidates[0] / "structured_parsing" | |
| else: | |
| out_dir = outputs_root | |
| else: | |
| outputs_root.mkdir(parents=True, exist_ok=True) | |
| out_dir = outputs_root | |
| # Determine which kinds to include in outputs based on target selection | |
| allowed_kinds: Optional[List[str]] = None | |
| if target in ("tables", "charts"): | |
| allowed_kinds = [target] | |
| elif target == "both": | |
| allowed_kinds = ["tables", "charts"] | |
| # Gather output files and create ZIP | |
| gallery_items, file_paths, zip_path = gather_outputs( | |
| out_dir, | |
| allowed_kinds, | |
| zip_filename=original_filename, | |
| is_structured_parsing=True | |
| ) | |
| # Build tables HTML preview from Excel data (when VLM enabled) | |
| tables_html = "" | |
| try: | |
| if use_vlm: | |
| # Find Excel file based on target | |
| excel_filename = None | |
| if target in ("tables", "charts"): | |
| if target == "tables": | |
| excel_filename = "parsed_tables.xlsx" | |
| else: # charts | |
| excel_filename = "parsed_charts.xlsx" | |
| elif target == "both": | |
| excel_filename = "parsed_tables_charts.xlsx" | |
| if excel_filename: | |
| excel_path = out_dir / excel_filename | |
| if excel_path.exists(): | |
| # Read Excel file and create HTML tables | |
| xl_file = pd.ExcelFile(excel_path) | |
| html_blocks = [] | |
| for sheet_name in xl_file.sheet_names: | |
| df = pd.read_excel(excel_path, sheet_name=sheet_name) | |
| if not df.empty: | |
| # Create table with title | |
| title = f"<h3>{_html.escape(sheet_name)}</h3>" | |
| # Convert DataFrame to HTML table | |
| table_html = df.to_html( | |
| classes="doc-table", | |
| table_id=None, | |
| escape=True, | |
| index=False, | |
| na_rep="" | |
| ) | |
| html_blocks.append(title + table_html) | |
| tables_html = "\n".join(html_blocks) | |
| except Exception as e: | |
| try: | |
| error_msg = str(e).encode('utf-8', errors='replace').decode('utf-8') | |
| print(f"Error building tables HTML: {error_msg}") | |
| except Exception: | |
| print(f"Error building tables HTML: <Unicode encoding error>") | |
| tables_html = "" | |
| return ( | |
| f"β Parsing completed successfully!\nπ Output directory: {out_dir}", | |
| tables_html, | |
| gallery_items, | |
| file_paths, | |
| zip_path | |
| ) | |
| def run_docres_restoration( | |
| pdf_file: str, | |
| task: str, | |
| device: str, | |
| dpi: int, | |
| save_enhanced: bool, | |
| save_images: bool | |
| ) -> Tuple[str, Optional[str], Optional[str], Optional[dict], List[str]]: | |
| """Run DocRes image restoration on PDF.""" | |
| if not pdf_file: | |
| return ("No file provided.", None, None, None, []) | |
| # Check if Doctra components are available | |
| if DocResUIWrapper is None: | |
| return ("β Error: Doctra library not properly installed. Please check the requirements.", None, None, None, []) | |
| try: | |
| # Initialize DocRes engine | |
| device_str = None if device == "auto" else device | |
| docres = DocResUIWrapper(device=device_str) | |
| # Extract filename | |
| original_filename = Path(pdf_file).stem | |
| # Create output directory | |
| output_dir = Path("outputs") / f"{original_filename}_docres" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Run DocRes restoration | |
| enhanced_pdf_path = output_dir / f"{original_filename}_enhanced.pdf" | |
| docres.restore_pdf( | |
| pdf_path=pdf_file, | |
| output_path=str(enhanced_pdf_path), | |
| task=task, | |
| dpi=dpi | |
| ) | |
| # Prepare outputs | |
| file_paths = [] | |
| if save_enhanced and enhanced_pdf_path.exists(): | |
| file_paths.append(str(enhanced_pdf_path)) | |
| if save_images: | |
| # Look for enhanced images | |
| images_dir = output_dir / "enhanced_images" | |
| if images_dir.exists(): | |
| for img_path in sorted(images_dir.glob("*.jpg")): | |
| file_paths.append(str(img_path)) | |
| # Create metadata | |
| metadata = { | |
| "task": task, | |
| "device": str(docres.device), | |
| "dpi": dpi, | |
| "original_file": pdf_file, | |
| "enhanced_file": str(enhanced_pdf_path) if enhanced_pdf_path.exists() else None, | |
| "output_directory": str(output_dir) | |
| } | |
| status_msg = f"β DocRes restoration completed successfully!\nπ Output directory: {output_dir}" | |
| enhanced_pdf_file = str(enhanced_pdf_path) if enhanced_pdf_path.exists() else None | |
| return (status_msg, pdf_file, enhanced_pdf_file, metadata, file_paths) | |
| except Exception as e: | |
| error_msg = f"β DocRes restoration failed: {str(e)}" | |
| return (error_msg, None, None, None, []) | |
| def run_enhanced_parse( | |
| pdf_file: str, | |
| use_image_restoration: bool, | |
| restoration_task: str, | |
| restoration_device: str, | |
| restoration_dpi: int, | |
| use_vlm: bool, | |
| vlm_provider: str, | |
| vlm_api_key: str, | |
| layout_model_name: str, | |
| dpi: int, | |
| min_score: float, | |
| ocr_lang: str, | |
| ocr_psm: int, | |
| ocr_oem: int, | |
| ocr_extra_config: str, | |
| box_separator: str, | |
| ) -> Tuple[str, Optional[str], List[str], str, Optional[str], Optional[str], str]: | |
| """Run enhanced PDF parsing with DocRes image restoration.""" | |
| if not pdf_file: | |
| return ("No file provided.", None, [], "", None, None, "") | |
| # Check if Doctra components are available | |
| if EnhancedPDFParser is None: | |
| return ("β Error: Doctra library not properly installed. Please check the requirements.", None, [], "", None, None, "") | |
| # Validate VLM configuration if VLM is enabled | |
| if use_vlm: | |
| vlm_error = validate_vlm_config(use_vlm, vlm_api_key, vlm_provider) | |
| if vlm_error: | |
| return (vlm_error, None, [], "", None, None, "") | |
| original_filename = Path(pdf_file).stem | |
| # Create temporary directory for processing | |
| tmp_dir = Path(tempfile.mkdtemp(prefix="doctra_enhanced_")) | |
| input_pdf = tmp_dir / f"{original_filename}.pdf" | |
| shutil.copy2(pdf_file, input_pdf) | |
| try: | |
| # Initialize enhanced parser with configuration | |
| parser = EnhancedPDFParser( | |
| use_image_restoration=use_image_restoration, | |
| restoration_task=restoration_task, | |
| restoration_device=restoration_device if restoration_device != "auto" else None, | |
| restoration_dpi=int(restoration_dpi), | |
| use_vlm=use_vlm, | |
| vlm_provider=vlm_provider, | |
| vlm_api_key=vlm_api_key or None, | |
| layout_model_name=layout_model_name, | |
| dpi=int(dpi), | |
| min_score=float(min_score), | |
| ocr_lang=ocr_lang, | |
| ocr_psm=int(ocr_psm), | |
| ocr_oem=int(ocr_oem), | |
| ocr_extra_config=ocr_extra_config or "", | |
| box_separator=box_separator or "\n", | |
| ) | |
| # Parse the PDF with enhancement | |
| parser.parse(str(input_pdf)) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| try: | |
| error_msg = str(e).encode('utf-8', errors='replace').decode('utf-8') | |
| return (f"β Enhanced parsing failed: {error_msg}", None, [], "", None, None, "") | |
| except Exception: | |
| return (f"β Enhanced parsing failed: <Unicode encoding error>", None, [], "", None, None, "") | |
| # Find output directory | |
| outputs_root = Path("outputs") | |
| out_dir = outputs_root / original_filename / "enhanced_parse" | |
| if not out_dir.exists(): | |
| candidates = sorted(outputs_root.glob("*/"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| if candidates: | |
| out_dir = candidates[0] / "enhanced_parse" | |
| else: | |
| out_dir = outputs_root | |
| # If still no enhanced_parse directory, try to find any directory with enhanced files | |
| if not out_dir.exists(): | |
| for candidate_dir in outputs_root.rglob("*"): | |
| if candidate_dir.is_dir(): | |
| enhanced_pdfs = list(candidate_dir.glob("*enhanced*.pdf")) | |
| if enhanced_pdfs: | |
| out_dir = candidate_dir | |
| break | |
| # Load first page content initially | |
| md_preview = None | |
| try: | |
| pages_dir = out_dir / "pages" | |
| first_page_path = pages_dir / "page_001.md" | |
| if first_page_path.exists(): | |
| with first_page_path.open("r", encoding="utf-8", errors="ignore") as f: | |
| md_content = f.read() | |
| md_lines = md_content.split('\n') | |
| md_preview = create_page_html_content(md_lines, out_dir) | |
| else: | |
| md_file = next(out_dir.glob("*.md"), None) | |
| if md_file and md_file.exists(): | |
| with md_file.open("r", encoding="utf-8", errors="ignore") as f: | |
| md_content = f.read() | |
| md_lines = md_content.split('\n') | |
| md_preview = create_page_html_content(md_lines, out_dir) | |
| except Exception as e: | |
| print(f"β Error loading initial content: {e}") | |
| md_preview = None | |
| # Gather output files and create ZIP | |
| _, file_paths, zip_path = gather_outputs( | |
| out_dir, | |
| zip_filename=f"{original_filename}_enhanced", | |
| is_structured_parsing=False | |
| ) | |
| # Look for enhanced PDF file | |
| enhanced_pdf_path = None | |
| if use_image_restoration: | |
| enhanced_pdf_candidates = list(out_dir.glob("*enhanced*.pdf")) | |
| if enhanced_pdf_candidates: | |
| enhanced_pdf_path = str(enhanced_pdf_candidates[0]) | |
| else: | |
| parent_enhanced = list(out_dir.parent.glob("*enhanced*.pdf")) | |
| if parent_enhanced: | |
| enhanced_pdf_path = str(parent_enhanced[0]) | |
| return ( | |
| f"β Enhanced parsing completed successfully!\nπ Output directory: {out_dir}", | |
| md_preview, | |
| file_paths, | |
| zip_path, | |
| pdf_file, # Original PDF path | |
| enhanced_pdf_path, # Enhanced PDF path | |
| str(out_dir) # Output directory for page-specific content | |
| ) | |
| def create_tips_markdown() -> str: | |
| """Create the tips section markdown for the UI.""" | |
| return """ | |
| <div class="card"> | |
| <b>Tips</b> | |
| <ul> | |
| <li>On Spaces, set a secret <code>VLM_API_KEY</code> to enable VLM features.</li> | |
| <li>Use <strong>Enhanced Parser</strong> for documents that need image restoration before parsing (scanned docs, low-quality PDFs).</li> | |
| <li>Use <strong>DocRes Image Restoration</strong> for standalone image enhancement without parsing.</li> | |
| <li>DocRes tasks: <code>appearance</code> (default), <code>dewarping</code>, <code>deshadowing</code>, <code>deblurring</code>, <code>binarization</code>, <code>end2end</code>.</li> | |
| <li>Outputs are saved under <code>outputs/<pdf_stem>/</code>.</li> | |
| <li><strong>Note:</strong> Google Gemini VLM may not be available due to dependency conflicts. Use OpenAI, Anthropic, or other VLM providers.</li> | |
| </ul> | |
| </div> | |
| """ | |
| # Create the main Gradio interface | |
| with gr.Blocks(title="Doctra - Document Parser", theme=THEME, css=CUSTOM_CSS) as demo: | |
| # Header section | |
| gr.Markdown( | |
| """ | |
| <div class="header"> | |
| <h2 style="margin:0">Doctra β Document Parser</h2> | |
| <div class="subtitle">Parse PDFs, extract tables/charts, preview markdown, and download outputs.</div> | |
| </div> | |
| """ | |
| ) | |
| # Full Parse Tab | |
| with gr.Tab("Full Parse"): | |
| with gr.Row(): | |
| pdf = gr.File(file_types=[".pdf"], label="PDF") | |
| use_vlm = gr.Checkbox(label="Use VLM (optional)", value=False) | |
| vlm_provider = gr.Dropdown(["openai", "anthropic", "openrouter", "ollama"], value="openai", label="VLM Provider") | |
| vlm_api_key = gr.Textbox(type="password", label="VLM API Key", placeholder="Optional if VLM disabled") | |
| with gr.Accordion("Advanced", open=False): | |
| with gr.Row(): | |
| layout_model = gr.Textbox(value="PP-DocLayout_plus-L", label="Layout model") | |
| dpi = gr.Slider(100, 400, value=200, step=10, label="DPI") | |
| min_score = gr.Slider(0, 1, value=0.0, step=0.05, label="Min layout score") | |
| with gr.Row(): | |
| ocr_lang = gr.Textbox(value="eng", label="OCR Language") | |
| ocr_psm = gr.Slider(0, 13, value=4, step=1, label="Tesseract PSM") | |
| ocr_oem = gr.Slider(0, 3, value=3, step=1, label="Tesseract OEM") | |
| with gr.Row(): | |
| ocr_config = gr.Textbox(value="", label="Extra OCR config") | |
| box_sep = gr.Textbox(value="\n", label="Box separator") | |
| run_btn = gr.Button("βΆ Run Full Parse", variant="primary") | |
| status = gr.Textbox(label="Status", elem_classes=["status-ok"]) | |
| # Full Parse components | |
| with gr.Row(): | |
| with gr.Column(): | |
| md_preview = gr.HTML(label="Extracted Content", visible=True, elem_classes=["page-content"]) | |
| with gr.Column(): | |
| page_image = gr.Image(label="Page image", interactive=False) | |
| files_out = gr.Files(label="Download individual output files") | |
| zip_out = gr.File(label="Download all outputs (ZIP)") | |
| run_btn.click( | |
| fn=run_full_parse, | |
| inputs=[pdf, use_vlm, vlm_provider, vlm_api_key, layout_model, dpi, min_score, ocr_lang, ocr_psm, ocr_oem, ocr_config, box_sep], | |
| outputs=[status, md_preview, files_out, zip_out], | |
| ) | |
| # Tables & Charts Tab | |
| with gr.Tab("Extract Tables/Charts"): | |
| with gr.Row(): | |
| pdf_e = gr.File(file_types=[".pdf"], label="PDF") | |
| target = gr.Dropdown(["tables", "charts", "both"], value="both", label="Target") | |
| use_vlm_e = gr.Checkbox(label="Use VLM (optional)", value=False) | |
| vlm_provider_e = gr.Dropdown(["openai", "anthropic", "openrouter", "ollama"], value="openai", label="VLM Provider") | |
| vlm_api_key_e = gr.Textbox(type="password", label="VLM API Key", placeholder="Optional if VLM disabled") | |
| with gr.Accordion("Advanced", open=False): | |
| with gr.Row(): | |
| layout_model_e = gr.Textbox(value="PP-DocLayout_plus-L", label="Layout model") | |
| dpi_e = gr.Slider(100, 400, value=200, step=10, label="DPI") | |
| min_score_e = gr.Slider(0, 1, value=0.0, step=0.05, label="Min layout score") | |
| run_btn_e = gr.Button("βΆ Run Extraction", variant="primary") | |
| status_e = gr.Textbox(label="Status") | |
| with gr.Row(): | |
| with gr.Column(): | |
| tables_preview_e = gr.HTML(label="Extracted Data", elem_classes=["page-content"]) | |
| with gr.Column(): | |
| image_e = gr.Image(label="Selected Image", interactive=False) | |
| files_out_e = gr.Files(label="Download individual output files") | |
| zip_out_e = gr.File(label="Download all outputs (ZIP)") | |
| run_btn_e.click( | |
| fn=lambda f, t, a, b, c, d, e, g: run_extract( | |
| f.name if f else "", | |
| t, | |
| a, | |
| b, | |
| c, | |
| d, | |
| e, | |
| g, | |
| ), | |
| inputs=[pdf_e, target, use_vlm_e, vlm_provider_e, vlm_api_key_e, layout_model_e, dpi_e, min_score_e], | |
| outputs=[status_e, tables_preview_e, files_out_e, zip_out_e], | |
| ) | |
| # DocRes Image Restoration Tab | |
| with gr.Tab("DocRes Image Restoration"): | |
| with gr.Row(): | |
| pdf_docres = gr.File(file_types=[".pdf"], label="PDF") | |
| docres_task_standalone = gr.Dropdown( | |
| ["appearance", "dewarping", "deshadowing", "deblurring", "binarization", "end2end"], | |
| value="appearance", | |
| label="Restoration Task" | |
| ) | |
| docres_device_standalone = gr.Dropdown( | |
| ["auto", "cuda", "cpu"], | |
| value="auto", | |
| label="Device" | |
| ) | |
| with gr.Row(): | |
| docres_dpi = gr.Slider(100, 400, value=200, step=10, label="DPI") | |
| docres_save_enhanced = gr.Checkbox(label="Save Enhanced PDF", value=True) | |
| docres_save_images = gr.Checkbox(label="Save Enhanced Images", value=True) | |
| run_docres_btn = gr.Button("βΆ Run DocRes Restoration", variant="primary") | |
| docres_status = gr.Textbox(label="Status", elem_classes=["status-ok"]) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π Original PDF") | |
| docres_original_pdf = gr.File(label="Original PDF File", interactive=False, visible=False) | |
| docres_original_page_image = gr.Image(label="Original PDF Page", interactive=False, height=800) | |
| with gr.Column(): | |
| gr.Markdown("### β¨ Enhanced PDF") | |
| docres_enhanced_pdf = gr.File(label="Enhanced PDF File", interactive=False, visible=False) | |
| docres_enhanced_page_image = gr.Image(label="Enhanced PDF Page", interactive=False, height=800) | |
| docres_files_out = gr.Files(label="Download enhanced files") | |
| run_docres_btn.click( | |
| fn=run_docres_restoration, | |
| inputs=[pdf_docres, docres_task_standalone, docres_device_standalone, docres_dpi, docres_save_enhanced, docres_save_images], | |
| outputs=[docres_status, docres_original_pdf, docres_enhanced_pdf, docres_files_out] | |
| ) | |
| # Enhanced Parser Tab | |
| with gr.Tab("Enhanced Parser"): | |
| with gr.Row(): | |
| pdf_enhanced = gr.File(file_types=[".pdf"], label="PDF") | |
| use_image_restoration = gr.Checkbox(label="Use Image Restoration", value=True) | |
| restoration_task = gr.Dropdown( | |
| ["appearance", "dewarping", "deshadowing", "deblurring", "binarization", "end2end"], | |
| value="appearance", | |
| label="Restoration Task" | |
| ) | |
| restoration_device = gr.Dropdown( | |
| ["auto", "cuda", "cpu"], | |
| value="auto", | |
| label="Restoration Device" | |
| ) | |
| with gr.Row(): | |
| use_vlm_enhanced = gr.Checkbox(label="Use VLM (optional)", value=False) | |
| vlm_provider_enhanced = gr.Dropdown(["openai", "anthropic", "openrouter", "ollama"], value="openai", label="VLM Provider") | |
| vlm_api_key_enhanced = gr.Textbox(type="password", label="VLM API Key", placeholder="Optional if VLM disabled") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| with gr.Row(): | |
| restoration_dpi = gr.Slider(100, 400, value=200, step=10, label="Restoration DPI") | |
| layout_model_enhanced = gr.Textbox(value="PP-DocLayout_plus-L", label="Layout model") | |
| dpi_enhanced = gr.Slider(100, 400, value=200, step=10, label="Processing DPI") | |
| min_score_enhanced = gr.Slider(0, 1, value=0.0, step=0.05, label="Min layout score") | |
| with gr.Row(): | |
| ocr_lang_enhanced = gr.Textbox(value="eng", label="OCR Language") | |
| ocr_psm_enhanced = gr.Slider(0, 13, value=4, step=1, label="Tesseract PSM") | |
| ocr_oem_enhanced = gr.Slider(0, 3, value=3, step=1, label="Tesseract OEM") | |
| with gr.Row(): | |
| ocr_config_enhanced = gr.Textbox(value="", label="Extra OCR config") | |
| box_sep_enhanced = gr.Textbox(value="\n", label="Box separator") | |
| run_enhanced_btn = gr.Button("βΆ Run Enhanced Parse", variant="primary") | |
| enhanced_status = gr.Textbox(label="Status", elem_classes=["status-ok"]) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π Original PDF") | |
| enhanced_original_pdf = gr.File(label="Original PDF File", interactive=False, visible=False) | |
| enhanced_original_page_image = gr.Image(label="Original PDF Page", interactive=False, height=600) | |
| with gr.Column(): | |
| gr.Markdown("### β¨ Enhanced PDF") | |
| enhanced_enhanced_pdf = gr.File(label="Enhanced PDF File", interactive=False, visible=False) | |
| enhanced_enhanced_page_image = gr.Image(label="Enhanced PDF Page", interactive=False, height=600) | |
| with gr.Row(): | |
| enhanced_md_preview = gr.HTML(label="Extracted Content", visible=True, elem_classes=["page-content"]) | |
| enhanced_files_out = gr.Files(label="Download individual output files") | |
| enhanced_zip_out = gr.File(label="Download all outputs (ZIP)") | |
| run_enhanced_btn.click( | |
| fn=run_enhanced_parse, | |
| inputs=[ | |
| pdf_enhanced, use_image_restoration, restoration_task, restoration_device, restoration_dpi, | |
| use_vlm_enhanced, vlm_provider_enhanced, vlm_api_key_enhanced, layout_model_enhanced, | |
| dpi_enhanced, min_score_enhanced, ocr_lang_enhanced, ocr_psm_enhanced, ocr_oem_enhanced, | |
| ocr_config_enhanced, box_sep_enhanced | |
| ], | |
| outputs=[ | |
| enhanced_status, enhanced_md_preview, enhanced_files_out, enhanced_zip_out, | |
| enhanced_original_pdf, enhanced_enhanced_pdf | |
| ] | |
| ) | |
| # Tips section | |
| gr.Markdown(create_tips_markdown()) | |
| if __name__ == "__main__": | |
| # Launch the interface | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.getenv("PORT", "7860")), | |
| share=False | |
| ) | |