Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import shutil | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urljoin, urlparse | |
| from queue import Queue | |
| import time | |
| import zipfile | |
| import tempfile | |
| import sys | |
| import logging | |
| import traceback | |
| import pypandoc | |
| # --- Configuration & Logging --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36' | |
| REQUEST_TIMEOUT = 20 # seconds | |
| POLITENESS_DELAY = 0.3 # seconds between requests | |
| # --- Pandoc Check --- | |
| def check_pandoc_available(): | |
| """Checks if pypandoc can find a pandoc executable.""" | |
| try: | |
| pandoc_path = pypandoc.get_pandoc_path() | |
| logging.info(f"pypandoc found Pandoc executable at: {pandoc_path}") | |
| return True | |
| except OSError: | |
| logging.error("pypandoc could not find Pandoc executable.") | |
| logging.error("Please ensure Pandoc is installed OR install 'pypandoc_binary' (`pip install pypandoc_binary`)") | |
| return False | |
| except ImportError: | |
| logging.error("pypandoc library not found. Please install it (`pip install pypandoc_binary`).") | |
| return False | |
| # --- Core Functions --- | |
| def fetch_html(url): | |
| """Fetches HTML content from a given URL.""" | |
| try: | |
| headers = {'User-Agent': USER_AGENT} | |
| response = requests.get(url, timeout=REQUEST_TIMEOUT, headers=headers) | |
| response.raise_for_status() | |
| response.encoding = response.apparent_encoding if response.apparent_encoding else 'utf-8' | |
| logging.info(f"Successfully fetched: {url}") | |
| return response.text | |
| except requests.exceptions.Timeout: | |
| logging.error(f"Timeout fetching URL: {url}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"Error fetching URL {url}: {e}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Unexpected error fetching {url}: {e}") | |
| return None | |
| def convert_html_to_md(html_content, output_md_path, pandoc_output_format, pandoc_extra_args): | |
| """ | |
| Converts HTML content string to a Markdown file using pypandoc | |
| with specified format and arguments. | |
| """ | |
| if not html_content: | |
| logging.warning(f"Empty HTML content for {output_md_path}. Conversion skipped.") | |
| return False | |
| # Using html+smart enables better handling of typographic characters in source HTML | |
| input_format = 'html+smart' # Keep input format consistent | |
| try: | |
| logging.debug(f"pypandoc converting to {pandoc_output_format} with args: {pandoc_extra_args}") | |
| # Use pypandoc.convert_text to convert the HTML string | |
| # Specify input format ('html'), output format ('gfm'), and output file | |
| # pypandoc handles invoking pandoc correctly with the string input | |
| output = pypandoc.convert_text( | |
| source=html_content, | |
| to=pandoc_output_format, | |
| format=input_format, | |
| outputfile=output_md_path, | |
| extra_args=pandoc_extra_args, | |
| encoding='utf-8' | |
| ) | |
| # When using outputfile, convert_text returns an empty string on success | |
| if output == "": | |
| logging.info(f"Successfully converted using pypandoc -> {os.path.basename(output_md_path)}") | |
| return True | |
| else: | |
| logging.error(f"pypandoc conversion to {output_md_path} returned unexpected non-empty output.") | |
| if os.path.exists(output_md_path) and os.path.getsize(output_md_path) == 0: | |
| logging.warning(f"Output file {output_md_path} was created but is empty.") | |
| return False | |
| except Exception as e: | |
| logging.error(f"Error during pypandoc conversion for {output_md_path}: {e}") | |
| logging.error(traceback.format_exc()) | |
| if os.path.exists(output_md_path) and os.path.getsize(output_md_path) == 0: | |
| try: | |
| os.remove(output_md_path) | |
| logging.info(f"Removed empty/failed output file: {os.path.basename(output_md_path)}") | |
| except OSError as remove_err: | |
| logging.warning(f"Could not remove empty/failed output file {output_md_path}: {remove_err}") | |
| return False | |
| # --- Function for direct HTML to Markdown conversion --- | |
| def convert_html_text_to_md_string(html_content, pandoc_output_format, pandoc_extra_args): | |
| """ | |
| Converts an HTML string directly to a Markdown string using pypandoc. | |
| """ | |
| if not html_content or not html_content.strip(): | |
| logging.warning("Input HTML content is empty. Conversion skipped.") | |
| return None, "Error: HTML content cannot be empty." | |
| input_format = 'html+smart' | |
| try: | |
| logging.debug(f"pypandoc converting text to {pandoc_output_format} with args: {pandoc_extra_args}") | |
| output_md = pypandoc.convert_text( | |
| source=html_content, | |
| to=pandoc_output_format, | |
| format=input_format, | |
| extra_args=pandoc_extra_args, | |
| encoding='utf-8' | |
| ) | |
| logging.info("Successfully converted HTML text to Markdown string.") | |
| return output_md, "Conversion successful." | |
| except Exception as e: | |
| error_msg = f"Error during pypandoc conversion: {e}" | |
| logging.error(error_msg) | |
| logging.error(traceback.format_exc()) | |
| return None, error_msg | |
| def create_zip_archive(source_dir, output_zip_path): | |
| """Creates a ZIP archive from the contents of source_dir.""" | |
| try: | |
| with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for root, _, files in os.walk(source_dir): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| # Arcname is the path inside the zip file (relative to source_dir) | |
| arcname = os.path.relpath(file_path, source_dir) | |
| zipf.write(file_path, arcname) | |
| logging.info(f"Successfully created ZIP archive: {output_zip_path}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Failed to create ZIP archive {output_zip_path}: {e}") | |
| return False | |
| # --- Main Gradio Function (handles both modes) --- | |
| # The function now handles both URL and direct HTML text input. | |
| # It needs to be a generator (`yield`) to support progress updates in URL mode. | |
| def process_conversion_request( | |
| input_type, start_url_str, html_text_input, | |
| restrict_path, use_aggressive_conversion, | |
| progress=gr.Progress(track_tqdm=True) | |
| ): | |
| """The main function triggered by the Gradio interface, handling both modes.""" | |
| # --- 0. Check Pandoc Availability --- | |
| if not check_pandoc_available(): | |
| error_msg = "Error: Pandoc executable not found. Please ensure Pandoc is installed or run `pip install pypandoc_binary`." | |
| # Yield a final state for all outputs | |
| yield error_msg, None, None, gr.Markdown(visible=False) | |
| return | |
| # --- Determine Pandoc Settings based on Checkbox --- | |
| # wrap=none, Prevent auto-wrapping lines | |
| if use_aggressive_conversion: | |
| pandoc_format_to_use = 'gfm-raw_html+hard_line_breaks' | |
| pandoc_args_to_use = ['--wrap=none', '--markdown-headings=atx'] | |
| conversion_mode_msg = "Using aggressive conversion mode (disabling raw HTML, using ATX headers)." | |
| else: | |
| # Using gfm+hard_line_breaks ensures GitHub compatibility and respects single newlines | |
| pandoc_format_to_use = 'gfm+hard_line_breaks' | |
| pandoc_args_to_use = ['--wrap=none'] | |
| conversion_mode_msg = "Using standard conversion mode (may preserve more raw HTML)." | |
| logging.info(conversion_mode_msg) # Log the mode | |
| # --- MODE 1: Convert from URL --- | |
| if input_type == "Convert from URL": | |
| staging_dir = None # Initialize to ensure it exists for the finally block | |
| try: | |
| # --- 1. Validate URL and Determine Restriction Path --- | |
| start_url_str = start_url_str.strip() | |
| if not start_url_str: | |
| yield "Error: Starting URL cannot be empty.", None, None, gr.Markdown(visible=False) | |
| return | |
| try: | |
| parsed_start_url = urlparse(start_url_str) | |
| if not parsed_start_url.scheme or not parsed_start_url.netloc: | |
| raise ValueError("Invalid URL format (missing scheme or domain).") | |
| base_netloc = parsed_start_url.netloc | |
| base_scheme = parsed_start_url.scheme | |
| # Calculate the base directory path for comparison if restriction is enabled | |
| start_path_cleaned = parsed_start_url.path.strip('/') | |
| start_path_dir_for_restriction = None # Initialize restriction path base | |
| if start_path_cleaned: # If not root path | |
| # Use os.path.dirname to get the directory part | |
| # dirname('main/index.html') -> 'main' | |
| # dirname('main') -> '' (This needs correction if start URL is like /main/) | |
| # Let's adjust: if no '/' it means it's the first level dir or a root file | |
| if '/' not in start_path_cleaned and '.' not in start_path_cleaned: | |
| start_path_dir_for_restriction = start_path_cleaned # e.g. 'main' | |
| else: | |
| start_path_dir_for_restriction = os.path.dirname(start_path_cleaned) # e.g. 'main' from main/index.html, or '' from /index.html | |
| if start_path_dir_for_restriction == '': # Handle case like /index.html correctly | |
| start_path_dir_for_restriction = None # Treat like root, don't restrict path based on this | |
| except ValueError as e: | |
| yield f"Error: Invalid starting URL '{start_url_str}': {e}", None, None, gr.Markdown(visible=False) | |
| return | |
| # Log restriction status | |
| restriction_msg = f"Path restriction enabled: limiting to paths starting like '{start_path_dir_for_restriction}/'." if restrict_path and start_path_dir_for_restriction else "Path restriction disabled or starting from root." | |
| # --- 2. Setup Temporary Directory & Crawler --- | |
| staging_dir = tempfile.mkdtemp(prefix="md_convert_") | |
| logging.info(f"Created temporary directory: {staging_dir}") | |
| urls_to_process = Queue() | |
| processed_urls = set() # Still needed to avoid duplicates | |
| urls_to_process.put(start_url_str) | |
| processed_urls.add(start_url_str) # Add start URL here | |
| failed_urls = set() | |
| converted_count = 0 | |
| url_count_estimate = 1 # Total unique URLs discovered so far (starts with the first one) | |
| dequeued_count = 0 | |
| log_messages = ["Process started...", restriction_msg, conversion_mode_msg] | |
| yield "\n".join(log_messages), None, None, gr.Markdown(visible=False) | |
| # --- 3. Crawl and Convert Loop --- | |
| while not urls_to_process.empty(): | |
| # --- Get URL and Increment Dequeued Count --- | |
| current_url = urls_to_process.get() | |
| dequeued_count += 1 # Increment when an item is taken for processing | |
| # --- Update Progress Bar --- | |
| # Calculate progress based on dequeued items vs. total discovered | |
| # Denominator is the total number of unique URLs added to processed_urls/queue so far | |
| denominator = max(1, url_count_estimate) # url_count_estimate increases when new links are found | |
| current_progress_value = dequeued_count / denominator | |
| # Update Gradio progress - use dequeued_count for user display | |
| # Display: Processed X / Total_Discovered Y | |
| progress(current_progress_value, desc=f"Processing {dequeued_count}/{url_count_estimate}. Queue: {urls_to_process.qsize()}") | |
| # --- Process the current URL --- | |
| log_message = f"\nProcessing ({dequeued_count}/{url_count_estimate}): {current_url}" | |
| logging.info(log_message) | |
| log_messages.append(log_message) | |
| # Fetch HTML | |
| time.sleep(POLITENESS_DELAY) | |
| html_content = fetch_html(current_url) | |
| if not html_content: | |
| failed_urls.add(current_url) | |
| log_message = f" -> Failed to fetch content." | |
| logging.warning(log_message) | |
| log_messages.append(log_message) | |
| continue | |
| # Determine Output Path | |
| parsed_current_url = urlparse(current_url) | |
| # Get the path part of the URL, removing leading/trailing slashes | |
| url_path_segment = parsed_current_url.path.strip('/') or 'index' # e.g., "main/index.html", "HEAD/index.html", "" | |
| # Now, determine the final .md filename based on the path base | |
| if url_path_segment.lower().endswith('.html'): | |
| relative_md_filename = os.path.splitext(url_path_segment)[0] + ".md" | |
| else: | |
| # If it's not empty and doesn't end with .html, assume it's a directory path | |
| # Append 'index.md' to treat it like accessing a directory index | |
| # e.g., if URL path was /main, url_path_segment is 'main', output becomes 'main/index.md' | |
| # If URL path was /path/to/file (no .html), output becomes 'path/to/file.md' if '.' in basename, else 'path/to/file/index.md' | |
| basename = os.path.basename(url_path_segment) | |
| if '.' in basename: # Check if it looks like a file without .html extension | |
| relative_md_filename = url_path_segment + ".md" | |
| else: # Assume it's a directory reference | |
| relative_md_filename = os.path.join(url_path_segment, "index.md") | |
| # Construct full path within the temporary staging directory | |
| output_md_full_path = os.path.join(staging_dir, relative_md_filename) | |
| os.makedirs(os.path.dirname(output_md_full_path), exist_ok=True) | |
| # Convert HTML to Markdown | |
| if convert_html_to_md(html_content, output_md_full_path, pandoc_format_to_use, pandoc_args_to_use): | |
| converted_count += 1 | |
| log_messages.append(f" -> Converted successfully to {os.path.relpath(output_md_full_path, staging_dir)}") | |
| else: | |
| failed_urls.add(current_url) | |
| log_messages.append(" -> Conversion failed.") | |
| # Find and Add New Links | |
| soup = BeautifulSoup(html_content, 'lxml') | |
| for link in soup.find_all('a', href=True): | |
| absolute_url = urljoin(current_url, link['href']).split('#', 1)[0] | |
| parsed_absolute_url = urlparse(absolute_url) | |
| # Basic Filtering (scheme, domain, looks like html) | |
| is_valid_target = ( | |
| parsed_absolute_url.scheme == base_scheme and | |
| parsed_absolute_url.netloc == base_netloc) | |
| if not is_valid_target: continue # Skip invalid links early | |
| # --- Path Restriction Check --- | |
| path_restricted = False | |
| # Only apply if checkbox is checked AND we derived a non-root restriction path | |
| if restrict_path and start_path_dir_for_restriction: | |
| candidate_path = parsed_absolute_url.path.strip('/') | |
| # Check if the cleaned candidate path starts with the restriction dir + '/' | |
| # OR if the candidate path is exactly the restriction dir (e.g. /main matching main) | |
| if not (candidate_path.startswith(start_path_dir_for_restriction + '/') or candidate_path == start_path_dir_for_restriction): | |
| path_restricted = True | |
| # --- End Path Restriction Check --- | |
| # Add to queue only if NOT restricted and NOT already processed | |
| if not path_restricted and absolute_url not in processed_urls: | |
| processed_urls.add(absolute_url) # Add to set immediately | |
| urls_to_process.put(absolute_url) | |
| url_count_estimate += 1 | |
| # --- 4. Create ZIP Archive --- | |
| progress(1.0, desc="Zipping files...") | |
| log_messages.append("\nCrawling complete. Creating ZIP file...") | |
| yield "\n".join(log_messages), None, None, gr.Markdown(visible=False) | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as temp_zip: | |
| output_zip_path = temp_zip.name | |
| if create_zip_archive(staging_dir, output_zip_path): | |
| log_messages.append(f"\nProcess finished successfully!") | |
| log_messages.append(f"Converted {converted_count} pages using {'aggressive' if use_aggressive_conversion else 'standard'} mode.") | |
| if failed_urls: | |
| log_messages.append(f"Failed to process {len(failed_urls)} URLs.") | |
| yield "\n".join(log_messages), output_zip_path, None, gr.Markdown(visible=False) | |
| else: | |
| log_messages.append("\nError: Failed to create the final ZIP archive.") | |
| yield "\n".join(log_messages), None, None, gr.Markdown(visible=False) | |
| except Exception as e: | |
| error_log = f"\nAn unexpected error occurred: {e}\n{traceback.format_exc()}" | |
| logging.error(error_log) | |
| yield error_log, None, None, gr.Markdown(visible=False) | |
| finally: | |
| # --- Cleanup --- | |
| if staging_dir and os.path.exists(staging_dir): | |
| shutil.rmtree(staging_dir) | |
| logging.info(f"Cleaned up temporary directory: {staging_dir}") | |
| # --- MODE 2: Convert from HTML Text --- | |
| elif input_type == "Convert from HTML Text": | |
| log_messages = [f"Process started...", conversion_mode_msg] | |
| if not html_text_input or not html_text_input.strip(): | |
| log_messages.append("Error: HTML content cannot be empty.") | |
| yield "\n".join(log_messages), None, None, gr.Markdown(visible=False) | |
| return | |
| progress(0.5, desc="Converting HTML text...") | |
| # Use the dedicated string conversion function | |
| markdown_output, status_msg = convert_html_text_to_md_string( | |
| html_text_input, pandoc_format_to_use, pandoc_args_to_use | |
| ) | |
| log_messages.append(status_msg) | |
| progress(1.0, desc="Complete") | |
| if markdown_output is not None: | |
| # Create a temporary file for download | |
| with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', suffix=".md", delete=False) as temp_md: | |
| temp_md.write(markdown_output) | |
| temp_md_path = temp_md.name | |
| log_messages.append("\nMarkdown has been generated. You can preview it below or download the file.") | |
| # Yield the final state: update logs, clear zip, show markdown preview, provide md file | |
| yield ("\n".join(log_messages), | |
| None, | |
| temp_md_path, | |
| gr.Markdown(value=markdown_output, visible=True), ) | |
| else: | |
| # Conversion failed, show logs and hide/clear other outputs | |
| yield ("\n".join(log_messages), | |
| None, | |
| None, | |
| gr.Markdown(visible=False), ) | |
| css = """ | |
| textarea[rows]:not([rows="1"]) { | |
| height: 250px; /* Give the HTML input box a fixed height */ | |
| overflow-y: auto !important; | |
| scrollbar-width: thin !important; | |
| } | |
| textarea[rows]:not([rows="1"])::-webkit-scrollbar { | |
| all: initial !important; | |
| background: #f1f1f1 !important; | |
| } | |
| textarea[rows]:not([rows="1"])::-webkit-scrollbar-thumb { | |
| all: initial !important; | |
| background: #a8a8a8 !important; | |
| } | |
| """ | |
| # --- Gradio UI Definition --- | |
| with gr.Blocks(title="HTML to Markdown Converter", css=css) as demo: | |
| gr.Markdown( | |
| """ | |
| # HTML to Markdown Converter (via pypandoc) | |
| Choose an input method: | |
| 1. **Convert from HTML Text**: Paste raw HTML source code directly to convert it into a single Markdown output. | |
| 2. **Convert from URL**: Enter the starting `index.html` URL of an online documentation site. The script will crawl internal links, convert pages to Markdown, and package the results into a ZIP file. | |
| **This tool requires `pip install pypandoc_binary` to function correctly.** | |
| """ | |
| ) | |
| # --- Input type selector --- | |
| input_type_radio = gr.Radio( | |
| ["Convert from HTML Text", "Convert from URL", ], | |
| label="Input Type", | |
| value="Convert from HTML Text" | |
| ) | |
| # --- HTML Text Mode UI --- | |
| with gr.Column(visible=True) as text_mode_ui: | |
| html_text_input = gr.Textbox( | |
| label="Paste HTML Source Code Here", | |
| lines=10, # Give it a decent initial size | |
| placeholder="<html><body><h1>Title</h1><p>This is a paragraph.</p></body></html>" | |
| ) | |
| # --- URL Mode UI --- | |
| with gr.Column(visible=False) as url_mode_ui: | |
| url_input = gr.Textbox( | |
| label="Starting Index HTML URL", | |
| placeholder="e.g., https://dghs-imgutils.deepghs.org/main/index.html" | |
| ) | |
| restrict_path_checkbox = gr.Checkbox( | |
| label="Restrict crawl to starting path structure (e.g., if start is '/main/index.html', only crawl '/main/...' URLs)", | |
| value=True # Default to restricting path | |
| ) | |
| # --- Common Options --- | |
| with gr.Row(): | |
| aggressive_md_checkbox = gr.Checkbox( | |
| label="Aggressive Markdown conversion (disable raw HTML, use ATX headers)", | |
| value=True # Default to aggressive conversion | |
| ) | |
| with gr.Row(): | |
| start_button = gr.Button("Start Conversion", variant="primary") | |
| # --- HTML Text Mode Outputs --- | |
| with gr.Column(visible=True) as text_mode_outputs: | |
| gr.Markdown("---") | |
| gr.Markdown("### Markdown Conversion Result") | |
| md_output_file = gr.File(label="Download Markdown File (.md)") # Download the single file | |
| md_output_display = gr.Markdown(label="Preview") # Preview the result | |
| # --- URL Mode Outputs --- | |
| with gr.Column(visible=False) as url_mode_outputs: | |
| log_output = gr.Textbox(label="Progress Logs", lines=15, interactive=False, show_copy_button=True) | |
| zip_output = gr.File(label="Download Markdown Archive (ZIP)") | |
| # --- UI Logic to switch between modes --- | |
| def update_ui_visibility(input_type): | |
| is_url_mode = (input_type == "Convert from URL") | |
| return { | |
| text_mode_ui: gr.update(visible=not is_url_mode), | |
| text_mode_outputs: gr.update(visible=not is_url_mode), | |
| url_mode_ui: gr.update(visible=is_url_mode), | |
| url_mode_outputs: gr.update(visible=is_url_mode), | |
| } | |
| input_type_radio.change( | |
| fn=update_ui_visibility, | |
| inputs=input_type_radio, | |
| outputs=[url_mode_ui, text_mode_ui, url_mode_outputs, text_mode_outputs] | |
| ) | |
| # --- Button click event wiring --- | |
| start_button.click( | |
| fn=process_conversion_request, | |
| inputs=[ | |
| input_type_radio, | |
| url_input, | |
| html_text_input, | |
| restrict_path_checkbox, | |
| aggressive_md_checkbox | |
| ], | |
| # The function now needs to update all possible outputs | |
| outputs=[ | |
| log_output, | |
| zip_output, | |
| md_output_file, | |
| md_output_display, | |
| ], | |
| show_progress="full" | |
| ) | |
| # --- Launch App --- | |
| if __name__ == "__main__": | |
| demo.queue() | |
| demo.launch(inbrowser=True) | |