import gradio as gr import requests import json import os from datetime import datetime from typing import List, Dict, Any, Optional from openfloor import ( Envelope, Conversation, Sender, DialogEvent, TextFeature, UtteranceEvent, GetManifestsEvent, To ) from openfloor.manifest import * from openfloor.envelope import * # Function to build the envelope for a manifest request using Open Floor library def build_manifest_request_envelope(agent_url): print(f"šŸ—ļø DEBUG: Building manifest envelope for {agent_url}") # Create conversation and sender conversation = Conversation() sender = Sender( speakerUri="openfloor://localhost/TestClient", serviceUrl="http://localhost" ) print(f"šŸ—ļø DEBUG: Created sender with speakerUri='openfloor://localhost/TestClient'") # Create envelope envelope = Envelope(conversation=conversation, sender=sender) print(f"šŸ—ļø DEBUG: Created envelope") # Add get manifests event manifest_event = GetManifestsEvent( to=To(serviceUrl=agent_url, private=False) ) envelope.events.append(manifest_event) print(f"šŸ—ļø DEBUG: Added GetManifestsEvent targeting {agent_url}") return envelope # Function to build an utterance envelope using Open Floor library def build_utterance_envelope(agent_url, message_text): # Create conversation and sender conversation = Conversation() sender = Sender( speakerUri="openfloor://localhost/TestClient", serviceUrl="http://localhost" ) # Create envelope envelope = Envelope(conversation=conversation, sender=sender) # Create dialog event with text feature dialog_event = DialogEvent( speakerUri="openfloor://localhost/TestClient", features={"text": TextFeature(values=[message_text])} ) # Create utterance event utterance_event = UtteranceEvent( dialogEvent=dialog_event, to=To(serviceUrl=agent_url, private=False) ) envelope.events.append(utterance_event) return envelope # Function to send the request to the agent (EXACT COPY FROM WORKING CODE WITH DEBUG) def send_request(agent_url, request_type, message_text="Hello, this is a test message!"): print(f"šŸ”„ DEBUG: send_request called with agent_url='{agent_url}', request_type='{request_type}'") try: if request_type == "manifest": envelope = build_manifest_request_envelope(agent_url) print(f"šŸ“¦ DEBUG: Built manifest envelope for {agent_url}") else: # utterance envelope = build_utterance_envelope(agent_url, message_text) print(f"šŸ“¦ DEBUG: Built utterance envelope for {agent_url}") # Convert to JSON using the library's method json_str = envelope.to_json() print(f"šŸ“ DEBUG: Envelope JSON length: {len(json_str)} chars") print(f"šŸ“ DEBUG: Full envelope JSON: {json_str}") envelope_dict = json.loads(json_str) json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2) print(f"šŸ“¤ DEBUG: Full request payload: {json_payload_pretty}") print(f"šŸ“¤ DEBUG: Request payload preview: {json_payload_pretty[:200]}...") # Send request headers = {'Content-Type': 'application/json'} payload = {"openFloor": envelope_dict} print(f"🌐 DEBUG: Sending {request_type} request to {agent_url}") print(f"šŸ”§ DEBUG: Headers: {headers}") response = requests.post(agent_url, json=payload, headers=headers, timeout=10) print(f"šŸ“„ DEBUG: Response status: {response.status_code}") print(f"šŸ“„ DEBUG: Response headers: {dict(response.headers)}") print(f"šŸ“„ DEBUG: Response content length: {len(response.text)} chars") print(f"šŸ“„ DEBUG: Response preview: {response.text[:300]}...") # Handle response try: response_json = response.json() print(f"āœ… DEBUG: Successfully parsed JSON response") if isinstance(response_json, dict): print(f"šŸ“‹ DEBUG: Response JSON keys: {list(response_json.keys())}") except Exception as json_err: print(f"āŒ DEBUG: JSON parse error: {json_err}") response_json = {"raw_response": response.text} # Return: envelope, status, response, status_message status_message = f"āœ… Request sent successfully (HTTP {response.status_code})" print(f"šŸŽÆ DEBUG: Returning status_message: {status_message}") return json_payload_pretty, response.status_code, response_json, status_message except Exception as e: print(f"šŸ’„ DEBUG: Exception in send_request: {type(e).__name__}: {e}") import traceback traceback.print_exc() error_response = {"error": f"Error: {str(e)}"} return "", 500, error_response, f"āŒ Request failed: {str(e)}" class OpenFloorAgent: def __init__(self, url: str, name: str = None): self.url = url self.name = name or url self.manifest = None self.capabilities = [] def get_manifest(self): """Get the agent's manifest and capabilities""" print(f"šŸŽÆ DEBUG: get_manifest called for {self.url}") try: # Use the EXACT working function json_payload, status_code, response_data, status_msg = send_request(self.url, "manifest") print(f"šŸ“Š DEBUG: send_request returned status_code={status_code}") print(f"šŸ“Š DEBUG: response_data type: {type(response_data)}") print(f"šŸ“Š DEBUG: response_data: {response_data}") if status_code == 200 and isinstance(response_data, dict): print(f"āœ… DEBUG: Got 200 response and dict data") if "openFloor" in response_data: print(f"āœ… DEBUG: Found 'openFloor' key in response") if "events" in response_data["openFloor"]: print(f"āœ… DEBUG: Found 'events' key in openFloor") events = response_data["openFloor"]["events"] print(f"šŸ“‹ DEBUG: Found {len(events)} events") for i, event in enumerate(events): print(f"šŸ“‹ DEBUG: Event {i}: {event}") # Handle both singular and plural event types if event.get("eventType") in ["publishManifests", "publishManifest"]: print(f"āœ… DEBUG: Found {event.get('eventType')} event") # Handle different manifest parameter names parameters = event.get("parameters", {}) manifests = None if "manifests" in parameters: manifests = parameters["manifests"] print(f"šŸ“‹ DEBUG: Found 'manifests' key with {len(manifests)} items") elif "servicingManifests" in parameters: manifests = parameters["servicingManifests"] print(f"šŸ“‹ DEBUG: Found 'servicingManifests' key with {len(manifests)} items") elif "manifest" in parameters: # Single manifest wrapped in array manifests = [parameters["manifest"]] print(f"šŸ“‹ DEBUG: Found single 'manifest' key, wrapping in array") if manifests and len(manifests) > 0: self.manifest = manifests[0] # Capabilities might be a single dict or list of dicts raw_capabilities = self.manifest.get("capabilities", []) if isinstance(raw_capabilities, dict): # Single capability dict - wrap in list self.capabilities = [raw_capabilities] elif isinstance(raw_capabilities, list): # Already a list self.capabilities = raw_capabilities else: self.capabilities = [] print(f"āœ… DEBUG: Successfully loaded manifest with {len(self.capabilities)} capabilities") return True else: print(f"āŒ DEBUG: No manifests found in parameters: {list(parameters.keys())}") else: print(f"āŒ DEBUG: Event type '{event.get('eventType')}' is not a manifest event") else: print(f"āŒ DEBUG: No 'events' key in openFloor") else: print(f"āŒ DEBUG: No 'openFloor' key in response") else: print(f"āŒ DEBUG: status_code={status_code}, is_dict={isinstance(response_data, dict)}") print(f"āŒ DEBUG: get_manifest returning False for {self.url}") return False except Exception as e: print(f"šŸ’„ DEBUG: Exception in get_manifest for {self.url}: {type(e).__name__}: {e}") import traceback traceback.print_exc() return False def send_utterance(self, message: str) -> str: """Send an utterance to the agent and get response""" try: # Use the EXACT working function json_payload, status_code, response_data, status_msg = send_request(self.url, "utterance", message) if status_code == 200 and isinstance(response_data, dict): if "openFloor" in response_data and "events" in response_data["openFloor"]: for event in response_data["openFloor"]["events"]: if event.get("eventType") == "utterance": dialog_event = event.get("parameters", {}).get("dialogEvent", {}) features = dialog_event.get("features", {}) text_feature = features.get("text", {}) tokens = text_feature.get("tokens", []) if tokens: return tokens[0].get("value", "No response") return f"Agent responded but no text found: {response_data}" else: return f"Agent error: HTTP {status_code}" except Exception as e: return f"Error communicating with agent: {e}" class BuiltInFloorManager: def __init__(self): self.agent_registry = {} self.active_conversations = {} def register_agent_from_openfloor_agent(self, openfloor_agent: OpenFloorAgent): if not openfloor_agent.manifest: return False speaker_uri = openfloor_agent.manifest.get("identification", {}).get("speakerUri") if not speaker_uri: speaker_uri = f"tag:{openfloor_agent.url.replace('https://', '').replace('http://', '')},2025:agent" self.agent_registry[speaker_uri] = { 'openfloor_agent': openfloor_agent, 'manifest': openfloor_agent.manifest, 'url': openfloor_agent.url, 'capabilities': openfloor_agent.capabilities, 'status': 'available', 'last_seen': datetime.now() } print(f"šŸ›ļø Floor Manager: Registered {openfloor_agent.name}") return True def find_best_agent_for_task(self, task: str) -> Optional[OpenFloorAgent]: if not self.agent_registry: return None task_lower = task.lower() best_agent_info = None best_score = 0 for speaker_uri, agent_info in self.agent_registry.items(): score = 0 capabilities = agent_info.get('capabilities', []) for capability in capabilities: keyphrases = capability.get("keyphrases", []) descriptions = capability.get("descriptions", []) for keyphrase in keyphrases: if keyphrase.lower() in task_lower: score += 2 for description in descriptions: if any(word in task_lower for word in description.lower().split()): score += 1 if score > best_score: best_score = score best_agent_info = agent_info if not best_agent_info and self.agent_registry: best_agent_info = list(self.agent_registry.values())[0] return best_agent_info['openfloor_agent'] if best_agent_info else None def get_all_capabilities(self) -> Dict[str, List[Dict]]: all_capabilities = {} for speaker_uri, agent_info in self.agent_registry.items(): agent_name = agent_info['openfloor_agent'].name all_capabilities[agent_name] = agent_info.get('capabilities', []) return all_capabilities def find_agents_with_capability(self, capability_keywords: List[str]) -> List[OpenFloorAgent]: matching_agents = [] keywords_lower = [kw.lower() for kw in capability_keywords] for speaker_uri, agent_info in self.agent_registry.items(): capabilities = agent_info.get('capabilities', []) for capability in capabilities: keyphrases = capability.get("keyphrases", []) descriptions = capability.get("descriptions", []) for keyphrase in keyphrases: if any(kw in keyphrase.lower() for kw in keywords_lower): matching_agents.append(agent_info['openfloor_agent']) break if agent_info['openfloor_agent'] in matching_agents: break for description in descriptions: if any(kw in description.lower() for kw in keywords_lower): matching_agents.append(agent_info['openfloor_agent']) break if agent_info['openfloor_agent'] in matching_agents: break return matching_agents _agents_cache: Dict[str, OpenFloorAgent] = {} _floor_manager = BuiltInFloorManager() _initialized = False def _ensure_initialization(): """Ensure MCP server is properly initialized""" global _initialized if not _initialized: print("šŸ”§ Initializing MCP-Open Floor Bridge...") # Pre-discover agents from environment if available env_agents = os.getenv("OPENFLOOR_AGENTS", "") if env_agents: env_urls = [url.strip() for url in env_agents.split(",") if url.strip()] print(f"šŸŒ Pre-loading {len(env_urls)} agents from environment...") for url in env_urls: if url and url not in _agents_cache: try: agent = OpenFloorAgent(url) if agent.get_manifest(): _agents_cache[url] = agent _floor_manager.register_agent_from_openfloor_agent(agent) print(f"āœ… Pre-loaded agent: {agent.name} at {url}") else: print(f"āŒ Failed to pre-load agent from: {url}") except Exception as e: print(f"āŒ Error pre-loading agent {url}: {e}") _initialized = True print("šŸš€ MCP-Open Floor Bridge initialized") def _discover_agents_from_env_and_headers(request: gr.Request) -> Dict[str, OpenFloorAgent]: _ensure_initialization() agent_urls = [] # Environment variable env_agents = os.getenv("OPENFLOOR_AGENTS", "") if env_agents: env_urls = [url.strip() for url in env_agents.split(",") if url.strip()] agent_urls.extend(env_urls) print(f"šŸŒ Found {len(env_urls)} agents from OPENFLOOR_AGENTS env var") # Request headers headers = dict(request.headers) if request else {} if "x-openfloor-agents" in headers: header_urls = [url.strip() for url in headers["x-openfloor-agents"].split(",") if url.strip()] agent_urls.extend(header_urls) print(f"šŸ“” Found {len(header_urls)} agents from x-openfloor-agents header") # Remove duplicates unique_urls = [] seen = set() for url in agent_urls: if url not in seen: unique_urls.append(url) seen.add(url) print(f"šŸ” Total unique agent URLs to discover: {len(unique_urls)}") # Register agents for url in unique_urls: if url and url not in _agents_cache: try: agent = OpenFloorAgent(url) if agent.get_manifest(): _agents_cache[url] = agent _floor_manager.register_agent_from_openfloor_agent(agent) print(f"āœ… Discovered and registered agent: {agent.name} at {url}") else: print(f"āŒ Failed to get manifest from: {url}") except Exception as e: print(f"āŒ Error discovering agent {url}: {e}") return _agents_cache def discover_openfloor_agents(request: gr.Request) -> str: agents = _discover_agents_from_env_and_headers(request) if not agents: return """āŒ No agents discovered. **Setup Options:** 1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"` 2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com` 3. **Both**: Environment + header URLs will be combined""" result = "šŸ¤– **Discovered Open Floor Agents:**\n\n" for url, agent in agents.items(): result += f"**{agent.name}**\n" result += f"- URL: {url}\n" if agent.manifest: identification = agent.manifest.get("identification", {}) result += f"- Role: {identification.get('role', 'Unknown')}\n" result += f"- Synopsis: {identification.get('synopsis', 'No description')}\n" result += f"- Capabilities: {len(agent.capabilities)}\n" for cap in agent.capabilities: keyphrases = cap.get("keyphrases", []) descriptions = cap.get("descriptions", []) if keyphrases: result += f" • Keywords: {', '.join(keyphrases)}\n" if descriptions: result += f" • Description: {', '.join(descriptions)}\n" result += "\n" return result def send_message_to_openfloor_agent(agent_url: str, message: str, request: gr.Request) -> str: agents = _discover_agents_from_env_and_headers(request) if agent_url not in agents: return f"āŒ Agent not found: {agent_url}. Please discover agents first or check the URL." agent = agents[agent_url] response = agent.send_utterance(message) return response def send_to_best_openfloor_agent(task_description: str, request: gr.Request) -> str: _discover_agents_from_env_and_headers(request) best_agent = _floor_manager.find_best_agent_for_task(task_description) if not best_agent: return """āŒ No agents available. **Setup Options:** 1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"` 2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com`""" response = best_agent.send_utterance(task_description) return f"šŸ›ļø **Floor Manager Selected: {best_agent.name}**\n\nšŸ¤– **Response**: {response}" def execute_agent_capability(capability_keywords: str, task_request: str, request: gr.Request) -> str: _discover_agents_from_env_and_headers(request) keywords = [kw.strip() for kw in capability_keywords.split(",")] matching_agents = _floor_manager.find_agents_with_capability(keywords) if not matching_agents: available_capabilities = _floor_manager.get_all_capabilities() capability_summary = "" for agent_name, capabilities in available_capabilities.items(): capability_summary += f"\n- {agent_name}: " all_keywords = [] for cap in capabilities: all_keywords.extend(cap.get("keyphrases", [])) capability_summary += ", ".join(all_keywords[:5]) return f"āŒ No agents found with capabilities matching: {capability_keywords}\n\nAvailable capabilities:{capability_summary}" selected_agent = matching_agents[0] response = selected_agent.send_utterance(task_request) return f"šŸŽÆ **Selected Agent: {selected_agent.name}** (matched: {capability_keywords})\n\nšŸ¤– **Response**: {response}" def list_all_agent_capabilities(request: gr.Request) -> str: _discover_agents_from_env_and_headers(request) all_capabilities = _floor_manager.get_all_capabilities() if not all_capabilities: return """āŒ No agents discovered. **Setup Options:** 1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"` 2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com`""" result = "šŸŽÆ **All Agent Capabilities:**\n\n" for agent_name, capabilities in all_capabilities.items(): result += f"**{agent_name}:**\n" if not capabilities: result += " - No capabilities defined\n\n" continue for i, capability in enumerate(capabilities, 1): keyphrases = capability.get("keyphrases", []) descriptions = capability.get("descriptions", []) languages = capability.get("languages", []) result += f" **Capability {i}:**\n" if keyphrases: result += f" • Keywords: {', '.join(keyphrases)}\n" if descriptions: result += f" • Can do: {', '.join(descriptions)}\n" if languages: result += f" • Languages: {', '.join(languages)}\n" result += "\n" result += "\n" return result def send_task_to_agents_with_keywords(keywords: str, task: str, request: gr.Request) -> str: _discover_agents_from_env_and_headers(request) keyword_list = [kw.strip() for kw in keywords.split(",")] matching_agents = _floor_manager.find_agents_with_capability(keyword_list) if not matching_agents: return f"āŒ No agents found with capabilities matching: {keywords}" result = f"šŸŽÆ **Found {len(matching_agents)} agents matching '{keywords}':**\n\n" for agent in matching_agents: response = agent.send_utterance(task) result += f"**{agent.name}:** {response}\n\n" return result with gr.Blocks(title="MCP-Open Floor Bridge", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # šŸŒ‰ MCP-Open Floor Bridge Server **šŸ†• Please use the new version available under [https://mcp.openfloor.dev](https://mcp.openfloor.dev).** ## Setup: - **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"` - **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com` """) # Environment variable status check env_agents = os.getenv("OPENFLOOR_AGENTS", "") if env_agents: agent_urls = [url.strip() for url in env_agents.split(",") if url.strip()] agent_list = '\n'.join([f"- {url}" for url in agent_urls]) gr.Markdown("### āœ… Environment Variable Status") gr.Markdown(f"**OPENFLOOR_AGENTS** is set with **{len(agent_urls)} agent(s)**:") gr.Markdown(agent_list) else: gr.Markdown(""" ### āš ļø Environment Variable Status **OPENFLOOR_AGENTS** is not set. Agents will be discovered from request headers only. """) # Expose functions as MCP tools gr.api(discover_openfloor_agents) gr.api(send_message_to_openfloor_agent) gr.api(send_to_best_openfloor_agent) gr.api(execute_agent_capability) gr.api(list_all_agent_capabilities) gr.api(send_task_to_agents_with_keywords) if __name__ == "__main__": demo.launch( share=False, debug=False, mcp_server=True )