import gradio as gr import subprocess import os import base64 from typing import List, Dict, Any from pathlib import Path from datetime import datetime class MusicRecognitionMCPServer: def __init__(self, allowed_directories: List[str] = None): """Initialize MCP Server with configurable file access""" self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"] self.processed_files = {} # Cache of processed results # Convert relative paths to absolute and check access abs_directories = [] for directory in self.allowed_directories: # Convert relative paths to absolute if not os.path.isabs(directory): directory = os.path.abspath(directory) abs_directories.append(directory) print(f"Checking directory: {directory}") # Only try to create if it doesn't exist and we have permission if not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) os.chmod(directory, 0o755) print(f"✅ Directory created: {directory}") except PermissionError: print(f"⚠️ Directory doesn't exist but can't create: {directory}") print(f" This is normal for system directories like /tmp") except Exception as e: print(f"⚠️ Warning creating {directory}: {e}") else: print(f"✅ Directory exists: {directory}") self.allowed_directories = abs_directories print(f"Final allowed directories: {self.allowed_directories}") # Test Audiveris installation self._test_audiveris() def list_resources(self) -> List[Dict[str, Any]]: """List available resources following MCP patterns""" resources = [] # Add processed files as resources for file_id, file_info in self.processed_files.items(): resources.append({ "uri": f"musicxml://{file_id}", "name": file_info["original_name"], "description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}", "mimeType": "application/vnd.recordare.musicxml+xml" }) # Add available PDF files in allowed directories for directory in self.allowed_directories: if os.path.exists(directory): for file_path in Path(directory).rglob("*.pdf"): if self._is_file_accessible(str(file_path)): resources.append({ "uri": f"file://{file_path}", "name": file_path.name, "description": f"PDF music score available for processing: {file_path.name}", "mimeType": "application/pdf" }) return resources def read_resource(self, uri: str) -> Dict[str, Any]: """Read resource content following MCP patterns""" if uri.startswith("musicxml://"): # Return processed MusicXML file file_id = uri.replace("musicxml://", "") if file_id in self.processed_files: file_info = self.processed_files[file_id] try: with open(file_info["output_path"], "rb") as f: content = base64.b64encode(f.read()).decode() return { "contents": [{ "type": "resource", "resource": { "uri": uri, "text": content, "mimeType": "application/vnd.recordare.musicxml+xml" } }] } except FileNotFoundError: raise Exception(f"MusicXML file not found: {file_info['output_path']}") else: raise Exception(f"Resource not found: {uri}") elif uri.startswith("file://"): # Return PDF file content file_path = uri.replace("file://", "") if not self._is_file_accessible(file_path): raise Exception(f"File access denied: {file_path}") try: with open(file_path, "rb") as f: content = base64.b64encode(f.read()).decode() return { "contents": [{ "type": "resource", "resource": { "uri": uri, "text": content, "mimeType": "application/pdf" } }] } except FileNotFoundError: raise Exception(f"File not found: {file_path}") else: raise Exception(f"Unsupported URI scheme: {uri}") def _is_file_accessible(self, file_path: str) -> bool: """Check if file is within allowed directories""" abs_path = os.path.abspath(file_path) return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories) def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]: """Tool for music recognition following MCP patterns""" # Handle different URI formats if pdf_uri.startswith("file://"): pdf_path = pdf_uri.replace("file://", "") elif pdf_uri.startswith("data:"): # Handle data URIs (base64 encoded) return self._process_data_uri(pdf_uri, output_dir) else: # Assume it's a direct file path pdf_path = pdf_uri if not self._is_file_accessible(pdf_path): raise Exception(f"File access denied: {pdf_path}") if not os.path.exists(pdf_path): raise Exception(f"PDF file not found: {pdf_path}") try: output_file = self._recognize_music_core(pdf_path, output_dir) # Store in processed files cache file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}" self.processed_files[file_id] = { "original_name": os.path.basename(pdf_path), "original_path": pdf_path, "output_path": output_file, "processed_at": datetime.now().isoformat(), "file_id": file_id } # Return MCP-compliant response return { "content": [{ "type": "text", "text": f"✅ Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n" f"📁 Output file: {output_file}\n" f"🔗 Resource URI: musicxml://{file_id}\n" f"📊 File size: {os.path.getsize(output_file)} bytes\n\n" f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`" }], "isError": False } except Exception as e: return { "content": [{ "type": "text", "text": f"❌ Music recognition failed: {str(e)}" }], "isError": True } def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]: """Process base64 encoded data URI""" try: # Parse data URI: data:application/pdf;base64, header, data = data_uri.split(',', 1) mime_type = header.split(';')[0].replace('data:', '') if mime_type != 'application/pdf': raise Exception(f"Unsupported MIME type: {mime_type}") # Fix base64 padding if needed data = self._fix_base64_padding(data) # Decode base64 data pdf_data = base64.b64decode(data) # Save to temporary file temp_dir = output_dir or "/tmp" temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf") with open(temp_pdf, 'wb') as f: f.write(pdf_data) # Process the file return self.recognize_music_tool(f"file://{temp_pdf}", output_dir) except Exception as e: raise Exception(f"Failed to process data URI: {str(e)}") def _fix_base64_padding(self, data: str) -> str: """Fix base64 padding to make it valid""" # Remove any whitespace data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '') # Add padding if needed missing_padding = len(data) % 4 if missing_padding: data += '=' * (4 - missing_padding) return data def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str: """Core music recognition function""" audiveris = "/opt/audiveris/bin/Audiveris" if output_dir is None: output_dir = "/tmp" # Ensure output directory exists with proper permissions os.makedirs(output_dir, exist_ok=True) try: os.chmod(output_dir, 0o755) except Exception as e: print(f"Warning: Could not set permissions for {output_dir}: {e}") if not self._is_file_accessible(output_dir): raise Exception(f"Output directory access denied: {output_dir}") # Verify input file exists if not os.path.exists(pdf_file_path): raise Exception(f"Input PDF file not found: {pdf_file_path}") pdf_file_name = os.path.basename(pdf_file_path) pdf_name_without_ext = os.path.splitext(pdf_file_name)[0] # Try both possible extensions possible_extensions = [".mxl", ".xml", ".musicxml"] output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions] cmd = [ audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path ] print(f"Running Audiveris command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) print(f"Audiveris stdout: {result.stdout}") print(f"Audiveris stderr: {result.stderr}") print(f"Audiveris return code: {result.returncode}") # List files in output directory for debugging if os.path.exists(output_dir): files_in_output = os.listdir(output_dir) print(f"Files in output directory: {files_in_output}") # Check if any of the possible output files exist existing_output = None for output_file in output_files: if os.path.exists(output_file): existing_output = output_file break if existing_output: print(f"Found output file: {existing_output}") return existing_output # If no output file found, provide detailed error error_msg = f"Audiveris processing failed.\n" error_msg += f"Return code: {result.returncode}\n" error_msg += f"Stdout: {result.stdout}\n" error_msg += f"Stderr: {result.stderr}\n" error_msg += f"Expected files: {output_files}\n" error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n" raise Exception(error_msg) def _test_audiveris(self): """Test if Audiveris is properly installed""" audiveris = "/opt/audiveris/bin/Audiveris" if not os.path.exists(audiveris): print(f"⚠️ Warning: Audiveris not found at {audiveris}") return False try: # Test Audiveris with help command result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10) if "Audiveris" in result.stdout or "Audiveris" in result.stderr: print("✅ Audiveris installation verified") return True else: print(f"⚠️ Warning: Audiveris may not be working properly") print(f"Output: {result.stdout}") print(f"Error: {result.stderr}") return False except Exception as e: print(f"⚠️ Warning: Could not test Audiveris: {e}") return False # Initialize MCP Server with proper absolute paths mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"]) def recognize_music_gradio(pdf_file): """Gradio wrapper for music recognition""" try: print(f"Processing file: {pdf_file.name}") result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}") if result.get("isError"): error_msg = result["content"][0]["text"] print(f"Error in music recognition: {error_msg}") return None # Extract file ID from the response response_text = result["content"][0]["text"] print(f"Response text: {response_text}") if "musicxml://" in response_text: file_id = response_text.split("musicxml://")[1].split("`")[0] print(f"Extracted file ID: {file_id}") if file_id in mcp_server.processed_files: file_info = mcp_server.processed_files[file_id] output_path = file_info["output_path"] print(f"Output path from cache: {output_path}") if os.path.exists(output_path): print(f"✅ File exists: {output_path}") return output_path else: print(f"❌ File not found: {output_path}") # If the above doesn't work, try to find the file directly pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0] possible_files = [ f"/tmp/{pdf_basename}.mxl", f"/tmp/{pdf_basename}.xml", f"/tmp/{pdf_basename}.musicxml" ] for file_path in possible_files: print(f"Checking: {file_path}") if os.path.exists(file_path): print(f"✅ Found file: {file_path}") return file_path print("❌ No output file found in any expected location") print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}") return None except Exception as e: print(f"Exception in Gradio wrapper: {str(e)}") import traceback traceback.print_exc() return None # Create enhanced Gradio interface with custom CSS and better UX custom_css = """ .gradio-container { max-width: 1200px !important; margin: auto !important; } .main-header { text-align: center; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 2rem; border-radius: 15px; margin-bottom: 2rem; box-shadow: 0 8px 32px rgba(0,0,0,0.1); } .feature-card { background: white; border-radius: 12px; padding: 1.5rem; margin: 1rem 0; box-shadow: 0 4px 16px rgba(0,0,0,0.1); border-left: 4px solid #667eea; } .upload-area { border: 2px dashed #667eea; border-radius: 12px; padding: 2rem; text-align: center; background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); transition: all 0.3s ease; } .upload-area:hover { border-color: #764ba2; background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); } .status-success { background: linear-gradient(135deg, #4caf50 0%, #45a049 100%); color: white; padding: 1rem; border-radius: 8px; margin: 1rem 0; } .status-error { background: linear-gradient(135deg, #f44336 0%, #d32f2f 100%); color: white; padding: 1rem; border-radius: 8px; margin: 1rem 0; } .info-box { background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); border-radius: 8px; padding: 1rem; margin: 1rem 0; border-left: 4px solid #2196f3; } """ def create_gradio_interface(): with gr.Blocks(css=custom_css, title="🎼 Audiveris Music Score Recognition", theme=gr.themes.Soft()) as interface: # Header gr.HTML("""

🎼 Audiveris Music Score Recognition

Transform your PDF music scores into editable MusicXML files using advanced AI recognition

""") # Main content area with gr.Row(): with gr.Column(scale=1): gr.HTML("""

✨ Features

""") gr.HTML("""

📋 How to use:

  1. Upload your PDF music score
  2. Click "🎵 Convert to MusicXML"
  3. Wait for processing to complete
  4. Download your MusicXML file
""") with gr.Column(scale=2): # File upload section gr.HTML("

📁 Upload Your Music Score

") pdf_input = gr.File( file_types=[".pdf"], label="Select PDF File", file_count="single", height=200, elem_classes=["upload-area"] ) # Processing button convert_btn = gr.Button( "🎵 Convert to MusicXML", variant="primary", size="lg", scale=1 ) # Status and progress status_display = gr.HTML(visible=False) progress_bar = gr.Progress() # Output section gr.HTML("

📥 Download Results

") output_file = gr.File( label="MusicXML Output", visible=False, height=100 ) # Processing info processing_info = gr.Textbox( label="Processing Details", lines=8, visible=False, interactive=False ) # Footer gr.HTML("""

Powered by Audiveris • Built with ❤️ using Gradio

For best results, use high-quality PDF scans with clear musical notation

""") # Enhanced processing function with better feedback def process_with_feedback(pdf_file, progress=gr.Progress()): if pdf_file is None: return ( gr.HTML("
❌ Please upload a PDF file first!
", visible=True), None, gr.Textbox(visible=False), gr.File(visible=False) ) try: # Show processing status progress(0.1, desc="📄 Analyzing PDF file...") status_html = """

🔄 Processing your music score...

File: {}

Size: {:.2f} MB

Please wait while Audiveris analyzes your score...

""".format( pdf_file.name.split('/')[-1], os.path.getsize(pdf_file.name) / (1024*1024) ) progress(0.3, desc="🎵 Running Audiveris recognition...") # Process the file result_file = recognize_music_gradio(pdf_file) progress(0.9, desc="✅ Finalizing results...") if result_file and os.path.exists(result_file): # Success success_html = """

✅ Conversion completed successfully!

📁 Output: {}

📊 Size: {:.2f} KB

🎉 Your MusicXML file is ready for download!

""".format( os.path.basename(result_file), os.path.getsize(result_file) / 1024 ) # Processing details details = f"""✅ CONVERSION SUCCESSFUL 📄 Input File: {pdf_file.name.split('/')[-1]} 📊 Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB 🎵 Output File: {os.path.basename(result_file)} 📊 Output Size: {os.path.getsize(result_file) / 1024:.2f} KB ⏱️ Processing completed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} 🎼 Your PDF music score has been successfully converted to MusicXML format! You can now download the file and use it in music notation software like MuseScore, Finale, or Sibelius.""" progress(1.0, desc="🎉 Complete!") return ( gr.HTML(success_html, visible=True), gr.File(result_file, visible=True), gr.Textbox(details, visible=True), gr.File(visible=True) ) else: # Failure error_html = """

❌ Conversion failed

The music recognition process encountered an error.

Please check that your PDF contains clear musical notation and try again.

""" error_details = f"""❌ CONVERSION FAILED 📄 Input File: {pdf_file.name.split('/')[-1]} 📊 Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB ⚠️ Error: No output file was generated by Audiveris ⏱️ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} 💡 Troubleshooting tips: • Ensure your PDF contains clear, high-quality musical notation • Check that the PDF is not password-protected • Try with a different PDF file • Make sure the musical notation is not handwritten""" return ( gr.HTML(error_html, visible=True), None, gr.Textbox(error_details, visible=True), gr.File(visible=False) ) except Exception as e: # Exception handling error_html = f"""

❌ Processing Error

An unexpected error occurred: {str(e)}

Please try again or contact support if the problem persists.

""" error_details = f"""❌ PROCESSING ERROR 📄 Input File: {pdf_file.name.split('/')[-1] if pdf_file else 'Unknown'} ⚠️ Error: {str(e)} ⏱️ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} 🔧 Technical Details: {str(e)} Please try again with a different file or contact support.""" return ( gr.HTML(error_html, visible=True), None, gr.Textbox(error_details, visible=True), gr.File(visible=False) ) # Connect the button to the processing function convert_btn.click( fn=process_with_feedback, inputs=[pdf_input], outputs=[status_display, output_file, processing_info, output_file], show_progress=True ) # Auto-hide status when new file is uploaded pdf_input.change( fn=lambda: (gr.HTML(visible=False), gr.Textbox(visible=False), gr.File(visible=False)), outputs=[status_display, processing_info, output_file] ) return interface # Create the enhanced interface gradio_interface = create_gradio_interface() # Removed run_gradio function - now running directly in main thread if __name__ == "__main__": print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) print() print("🎵 MCP-Compliant Music Recognition Service Starting...") print("📱 Gradio UI: http://localhost:7860") # Run Gradio directly in the main thread to keep the application alive try: gradio_interface.launch( server_name="0.0.0.0", server_port=7860, mcp_server=True, share=True, # Set to False for container deployment prevent_thread_lock=False # Allow blocking to keep main thread alive ) except Exception as e: print(f"❌ Failed to start Gradio interface: {e}") import traceback traceback.print_exc()