|
import gradio as gr |
|
import requests |
|
import json |
|
import os |
|
from datetime import datetime |
|
from typing import List, Dict, Any, Optional |
|
from openfloor import ( |
|
Envelope, Conversation, Sender, DialogEvent, TextFeature, |
|
UtteranceEvent, GetManifestsEvent, To |
|
) |
|
from openfloor.manifest import * |
|
from openfloor.envelope import * |
|
|
|
|
|
def build_manifest_request_envelope(agent_url): |
|
print(f"ποΈ DEBUG: Building manifest envelope for {agent_url}") |
|
|
|
conversation = Conversation() |
|
sender = Sender( |
|
speakerUri="openfloor://localhost/TestClient", |
|
serviceUrl="http://localhost" |
|
) |
|
print(f"ποΈ DEBUG: Created sender with speakerUri='openfloor://localhost/TestClient'") |
|
|
|
|
|
envelope = Envelope(conversation=conversation, sender=sender) |
|
print(f"ποΈ DEBUG: Created envelope") |
|
|
|
|
|
manifest_event = GetManifestsEvent( |
|
to=To(serviceUrl=agent_url, private=False) |
|
) |
|
envelope.events.append(manifest_event) |
|
print(f"ποΈ DEBUG: Added GetManifestsEvent targeting {agent_url}") |
|
|
|
return envelope |
|
|
|
|
|
def build_utterance_envelope(agent_url, message_text): |
|
|
|
conversation = Conversation() |
|
sender = Sender( |
|
speakerUri="openfloor://localhost/TestClient", |
|
serviceUrl="http://localhost" |
|
) |
|
|
|
|
|
envelope = Envelope(conversation=conversation, sender=sender) |
|
|
|
|
|
dialog_event = DialogEvent( |
|
speakerUri="openfloor://localhost/TestClient", |
|
features={"text": TextFeature(values=[message_text])} |
|
) |
|
|
|
|
|
utterance_event = UtteranceEvent( |
|
dialogEvent=dialog_event, |
|
to=To(serviceUrl=agent_url, private=False) |
|
) |
|
|
|
envelope.events.append(utterance_event) |
|
|
|
return envelope |
|
|
|
|
|
def send_request(agent_url, request_type, message_text="Hello, this is a test message!"): |
|
print(f"π DEBUG: send_request called with agent_url='{agent_url}', request_type='{request_type}'") |
|
try: |
|
if request_type == "manifest": |
|
envelope = build_manifest_request_envelope(agent_url) |
|
print(f"π¦ DEBUG: Built manifest envelope for {agent_url}") |
|
else: |
|
envelope = build_utterance_envelope(agent_url, message_text) |
|
print(f"π¦ DEBUG: Built utterance envelope for {agent_url}") |
|
|
|
|
|
json_str = envelope.to_json() |
|
print(f"π DEBUG: Envelope JSON length: {len(json_str)} chars") |
|
print(f"π DEBUG: Full envelope JSON: {json_str}") |
|
envelope_dict = json.loads(json_str) |
|
json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2) |
|
print(f"π€ DEBUG: Full request payload: {json_payload_pretty}") |
|
print(f"π€ DEBUG: Request payload preview: {json_payload_pretty[:200]}...") |
|
|
|
|
|
headers = {'Content-Type': 'application/json'} |
|
payload = {"openFloor": envelope_dict} |
|
|
|
print(f"π DEBUG: Sending {request_type} request to {agent_url}") |
|
print(f"π§ DEBUG: Headers: {headers}") |
|
|
|
response = requests.post(agent_url, json=payload, headers=headers, timeout=10) |
|
|
|
print(f"π₯ DEBUG: Response status: {response.status_code}") |
|
print(f"π₯ DEBUG: Response headers: {dict(response.headers)}") |
|
print(f"π₯ DEBUG: Response content length: {len(response.text)} chars") |
|
print(f"π₯ DEBUG: Response preview: {response.text[:300]}...") |
|
|
|
|
|
try: |
|
response_json = response.json() |
|
print(f"β
DEBUG: Successfully parsed JSON response") |
|
if isinstance(response_json, dict): |
|
print(f"π DEBUG: Response JSON keys: {list(response_json.keys())}") |
|
except Exception as json_err: |
|
print(f"β DEBUG: JSON parse error: {json_err}") |
|
response_json = {"raw_response": response.text} |
|
|
|
|
|
status_message = f"β
Request sent successfully (HTTP {response.status_code})" |
|
print(f"π― DEBUG: Returning status_message: {status_message}") |
|
return json_payload_pretty, response.status_code, response_json, status_message |
|
|
|
except Exception as e: |
|
print(f"π₯ DEBUG: Exception in send_request: {type(e).__name__}: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
error_response = {"error": f"Error: {str(e)}"} |
|
return "", 500, error_response, f"β Request failed: {str(e)}" |
|
|
|
class OpenFloorAgent: |
|
def __init__(self, url: str, name: str = None): |
|
self.url = url |
|
self.name = name or url |
|
self.manifest = None |
|
self.capabilities = [] |
|
|
|
def get_manifest(self): |
|
"""Get the agent's manifest and capabilities""" |
|
print(f"π― DEBUG: get_manifest called for {self.url}") |
|
try: |
|
|
|
json_payload, status_code, response_data, status_msg = send_request(self.url, "manifest") |
|
|
|
print(f"π DEBUG: send_request returned status_code={status_code}") |
|
print(f"π DEBUG: response_data type: {type(response_data)}") |
|
print(f"π DEBUG: response_data: {response_data}") |
|
|
|
if status_code == 200 and isinstance(response_data, dict): |
|
print(f"β
DEBUG: Got 200 response and dict data") |
|
if "openFloor" in response_data: |
|
print(f"β
DEBUG: Found 'openFloor' key in response") |
|
if "events" in response_data["openFloor"]: |
|
print(f"β
DEBUG: Found 'events' key in openFloor") |
|
events = response_data["openFloor"]["events"] |
|
print(f"π DEBUG: Found {len(events)} events") |
|
for i, event in enumerate(events): |
|
print(f"π DEBUG: Event {i}: {event}") |
|
|
|
if event.get("eventType") in ["publishManifests", "publishManifest"]: |
|
print(f"β
DEBUG: Found {event.get('eventType')} event") |
|
|
|
|
|
parameters = event.get("parameters", {}) |
|
manifests = None |
|
|
|
if "manifests" in parameters: |
|
manifests = parameters["manifests"] |
|
print(f"π DEBUG: Found 'manifests' key with {len(manifests)} items") |
|
elif "servicingManifests" in parameters: |
|
manifests = parameters["servicingManifests"] |
|
print(f"π DEBUG: Found 'servicingManifests' key with {len(manifests)} items") |
|
elif "manifest" in parameters: |
|
|
|
manifests = [parameters["manifest"]] |
|
print(f"π DEBUG: Found single 'manifest' key, wrapping in array") |
|
|
|
if manifests and len(manifests) > 0: |
|
self.manifest = manifests[0] |
|
|
|
raw_capabilities = self.manifest.get("capabilities", []) |
|
if isinstance(raw_capabilities, dict): |
|
|
|
self.capabilities = [raw_capabilities] |
|
elif isinstance(raw_capabilities, list): |
|
|
|
self.capabilities = raw_capabilities |
|
else: |
|
self.capabilities = [] |
|
print(f"β
DEBUG: Successfully loaded manifest with {len(self.capabilities)} capabilities") |
|
return True |
|
else: |
|
print(f"β DEBUG: No manifests found in parameters: {list(parameters.keys())}") |
|
else: |
|
print(f"β DEBUG: Event type '{event.get('eventType')}' is not a manifest event") |
|
else: |
|
print(f"β DEBUG: No 'events' key in openFloor") |
|
else: |
|
print(f"β DEBUG: No 'openFloor' key in response") |
|
else: |
|
print(f"β DEBUG: status_code={status_code}, is_dict={isinstance(response_data, dict)}") |
|
|
|
print(f"β DEBUG: get_manifest returning False for {self.url}") |
|
return False |
|
except Exception as e: |
|
print(f"π₯ DEBUG: Exception in get_manifest for {self.url}: {type(e).__name__}: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
return False |
|
|
|
def send_utterance(self, message: str) -> str: |
|
"""Send an utterance to the agent and get response""" |
|
try: |
|
|
|
json_payload, status_code, response_data, status_msg = send_request(self.url, "utterance", message) |
|
|
|
if status_code == 200 and isinstance(response_data, dict): |
|
if "openFloor" in response_data and "events" in response_data["openFloor"]: |
|
for event in response_data["openFloor"]["events"]: |
|
if event.get("eventType") == "utterance": |
|
dialog_event = event.get("parameters", {}).get("dialogEvent", {}) |
|
features = dialog_event.get("features", {}) |
|
text_feature = features.get("text", {}) |
|
tokens = text_feature.get("tokens", []) |
|
if tokens: |
|
return tokens[0].get("value", "No response") |
|
|
|
return f"Agent responded but no text found: {response_data}" |
|
else: |
|
return f"Agent error: HTTP {status_code}" |
|
|
|
except Exception as e: |
|
return f"Error communicating with agent: {e}" |
|
|
|
class BuiltInFloorManager: |
|
def __init__(self): |
|
self.agent_registry = {} |
|
self.active_conversations = {} |
|
|
|
def register_agent_from_openfloor_agent(self, openfloor_agent: OpenFloorAgent): |
|
if not openfloor_agent.manifest: |
|
return False |
|
|
|
speaker_uri = openfloor_agent.manifest.get("identification", {}).get("speakerUri") |
|
if not speaker_uri: |
|
speaker_uri = f"tag:{openfloor_agent.url.replace('https://', '').replace('http://', '')},2025:agent" |
|
|
|
self.agent_registry[speaker_uri] = { |
|
'openfloor_agent': openfloor_agent, |
|
'manifest': openfloor_agent.manifest, |
|
'url': openfloor_agent.url, |
|
'capabilities': openfloor_agent.capabilities, |
|
'status': 'available', |
|
'last_seen': datetime.now() |
|
} |
|
print(f"ποΈ Floor Manager: Registered {openfloor_agent.name}") |
|
return True |
|
|
|
def find_best_agent_for_task(self, task: str) -> Optional[OpenFloorAgent]: |
|
if not self.agent_registry: |
|
return None |
|
|
|
task_lower = task.lower() |
|
best_agent_info = None |
|
best_score = 0 |
|
|
|
for speaker_uri, agent_info in self.agent_registry.items(): |
|
score = 0 |
|
capabilities = agent_info.get('capabilities', []) |
|
|
|
for capability in capabilities: |
|
keyphrases = capability.get("keyphrases", []) |
|
descriptions = capability.get("descriptions", []) |
|
|
|
for keyphrase in keyphrases: |
|
if keyphrase.lower() in task_lower: |
|
score += 2 |
|
|
|
for description in descriptions: |
|
if any(word in task_lower for word in description.lower().split()): |
|
score += 1 |
|
|
|
if score > best_score: |
|
best_score = score |
|
best_agent_info = agent_info |
|
|
|
if not best_agent_info and self.agent_registry: |
|
best_agent_info = list(self.agent_registry.values())[0] |
|
|
|
return best_agent_info['openfloor_agent'] if best_agent_info else None |
|
|
|
def get_all_capabilities(self) -> Dict[str, List[Dict]]: |
|
all_capabilities = {} |
|
for speaker_uri, agent_info in self.agent_registry.items(): |
|
agent_name = agent_info['openfloor_agent'].name |
|
all_capabilities[agent_name] = agent_info.get('capabilities', []) |
|
return all_capabilities |
|
|
|
def find_agents_with_capability(self, capability_keywords: List[str]) -> List[OpenFloorAgent]: |
|
matching_agents = [] |
|
keywords_lower = [kw.lower() for kw in capability_keywords] |
|
|
|
for speaker_uri, agent_info in self.agent_registry.items(): |
|
capabilities = agent_info.get('capabilities', []) |
|
|
|
for capability in capabilities: |
|
keyphrases = capability.get("keyphrases", []) |
|
descriptions = capability.get("descriptions", []) |
|
|
|
for keyphrase in keyphrases: |
|
if any(kw in keyphrase.lower() for kw in keywords_lower): |
|
matching_agents.append(agent_info['openfloor_agent']) |
|
break |
|
|
|
if agent_info['openfloor_agent'] in matching_agents: |
|
break |
|
|
|
for description in descriptions: |
|
if any(kw in description.lower() for kw in keywords_lower): |
|
matching_agents.append(agent_info['openfloor_agent']) |
|
break |
|
|
|
if agent_info['openfloor_agent'] in matching_agents: |
|
break |
|
|
|
return matching_agents |
|
|
|
_agents_cache: Dict[str, OpenFloorAgent] = {} |
|
_floor_manager = BuiltInFloorManager() |
|
_initialized = False |
|
|
|
def _ensure_initialization(): |
|
"""Ensure MCP server is properly initialized""" |
|
global _initialized |
|
if not _initialized: |
|
print("π§ Initializing MCP-Open Floor Bridge...") |
|
|
|
env_agents = os.getenv("OPENFLOOR_AGENTS", "") |
|
if env_agents: |
|
env_urls = [url.strip() for url in env_agents.split(",") if url.strip()] |
|
print(f"π Pre-loading {len(env_urls)} agents from environment...") |
|
for url in env_urls: |
|
if url and url not in _agents_cache: |
|
try: |
|
agent = OpenFloorAgent(url) |
|
if agent.get_manifest(): |
|
_agents_cache[url] = agent |
|
_floor_manager.register_agent_from_openfloor_agent(agent) |
|
print(f"β
Pre-loaded agent: {agent.name} at {url}") |
|
else: |
|
print(f"β Failed to pre-load agent from: {url}") |
|
except Exception as e: |
|
print(f"β Error pre-loading agent {url}: {e}") |
|
_initialized = True |
|
print("π MCP-Open Floor Bridge initialized") |
|
|
|
def _discover_agents_from_env_and_headers(request: gr.Request) -> Dict[str, OpenFloorAgent]: |
|
_ensure_initialization() |
|
|
|
agent_urls = [] |
|
|
|
|
|
env_agents = os.getenv("OPENFLOOR_AGENTS", "") |
|
if env_agents: |
|
env_urls = [url.strip() for url in env_agents.split(",") if url.strip()] |
|
agent_urls.extend(env_urls) |
|
print(f"π Found {len(env_urls)} agents from OPENFLOOR_AGENTS env var") |
|
|
|
|
|
headers = dict(request.headers) if request else {} |
|
if "x-openfloor-agents" in headers: |
|
header_urls = [url.strip() for url in headers["x-openfloor-agents"].split(",") if url.strip()] |
|
agent_urls.extend(header_urls) |
|
print(f"π‘ Found {len(header_urls)} agents from x-openfloor-agents header") |
|
|
|
|
|
unique_urls = [] |
|
seen = set() |
|
for url in agent_urls: |
|
if url not in seen: |
|
unique_urls.append(url) |
|
seen.add(url) |
|
|
|
print(f"π Total unique agent URLs to discover: {len(unique_urls)}") |
|
|
|
|
|
for url in unique_urls: |
|
if url and url not in _agents_cache: |
|
try: |
|
agent = OpenFloorAgent(url) |
|
if agent.get_manifest(): |
|
_agents_cache[url] = agent |
|
_floor_manager.register_agent_from_openfloor_agent(agent) |
|
print(f"β
Discovered and registered agent: {agent.name} at {url}") |
|
else: |
|
print(f"β Failed to get manifest from: {url}") |
|
except Exception as e: |
|
print(f"β Error discovering agent {url}: {e}") |
|
|
|
return _agents_cache |
|
|
|
def discover_openfloor_agents(request: gr.Request) -> str: |
|
agents = _discover_agents_from_env_and_headers(request) |
|
|
|
if not agents: |
|
return """β No agents discovered. |
|
|
|
**Setup Options:** |
|
1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"` |
|
2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com` |
|
3. **Both**: Environment + header URLs will be combined""" |
|
|
|
result = "π€ **Discovered Open Floor Agents:**\n\n" |
|
for url, agent in agents.items(): |
|
result += f"**{agent.name}**\n" |
|
result += f"- URL: {url}\n" |
|
if agent.manifest: |
|
identification = agent.manifest.get("identification", {}) |
|
result += f"- Role: {identification.get('role', 'Unknown')}\n" |
|
result += f"- Synopsis: {identification.get('synopsis', 'No description')}\n" |
|
|
|
result += f"- Capabilities: {len(agent.capabilities)}\n" |
|
for cap in agent.capabilities: |
|
keyphrases = cap.get("keyphrases", []) |
|
descriptions = cap.get("descriptions", []) |
|
if keyphrases: |
|
result += f" β’ Keywords: {', '.join(keyphrases)}\n" |
|
if descriptions: |
|
result += f" β’ Description: {', '.join(descriptions)}\n" |
|
result += "\n" |
|
|
|
return result |
|
|
|
def send_message_to_openfloor_agent(agent_url: str, message: str, request: gr.Request) -> str: |
|
agents = _discover_agents_from_env_and_headers(request) |
|
|
|
if agent_url not in agents: |
|
return f"β Agent not found: {agent_url}. Please discover agents first or check the URL." |
|
|
|
agent = agents[agent_url] |
|
response = agent.send_utterance(message) |
|
return response |
|
|
|
def send_to_best_openfloor_agent(task_description: str, request: gr.Request) -> str: |
|
_discover_agents_from_env_and_headers(request) |
|
|
|
best_agent = _floor_manager.find_best_agent_for_task(task_description) |
|
|
|
if not best_agent: |
|
return """β No agents available. |
|
|
|
**Setup Options:** |
|
1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"` |
|
2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com`""" |
|
|
|
response = best_agent.send_utterance(task_description) |
|
|
|
return f"ποΈ **Floor Manager Selected: {best_agent.name}**\n\nπ€ **Response**: {response}" |
|
|
|
def execute_agent_capability(capability_keywords: str, task_request: str, request: gr.Request) -> str: |
|
_discover_agents_from_env_and_headers(request) |
|
|
|
keywords = [kw.strip() for kw in capability_keywords.split(",")] |
|
|
|
matching_agents = _floor_manager.find_agents_with_capability(keywords) |
|
|
|
if not matching_agents: |
|
available_capabilities = _floor_manager.get_all_capabilities() |
|
capability_summary = "" |
|
for agent_name, capabilities in available_capabilities.items(): |
|
capability_summary += f"\n- {agent_name}: " |
|
all_keywords = [] |
|
for cap in capabilities: |
|
all_keywords.extend(cap.get("keyphrases", [])) |
|
capability_summary += ", ".join(all_keywords[:5]) |
|
|
|
return f"β No agents found with capabilities matching: {capability_keywords}\n\nAvailable capabilities:{capability_summary}" |
|
|
|
selected_agent = matching_agents[0] |
|
response = selected_agent.send_utterance(task_request) |
|
|
|
return f"π― **Selected Agent: {selected_agent.name}** (matched: {capability_keywords})\n\nπ€ **Response**: {response}" |
|
|
|
def list_all_agent_capabilities(request: gr.Request) -> str: |
|
_discover_agents_from_env_and_headers(request) |
|
|
|
all_capabilities = _floor_manager.get_all_capabilities() |
|
|
|
if not all_capabilities: |
|
return """β No agents discovered. |
|
|
|
**Setup Options:** |
|
1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"` |
|
2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com`""" |
|
|
|
result = "π― **All Agent Capabilities:**\n\n" |
|
|
|
for agent_name, capabilities in all_capabilities.items(): |
|
result += f"**{agent_name}:**\n" |
|
|
|
if not capabilities: |
|
result += " - No capabilities defined\n\n" |
|
continue |
|
|
|
for i, capability in enumerate(capabilities, 1): |
|
keyphrases = capability.get("keyphrases", []) |
|
descriptions = capability.get("descriptions", []) |
|
languages = capability.get("languages", []) |
|
|
|
result += f" **Capability {i}:**\n" |
|
if keyphrases: |
|
result += f" β’ Keywords: {', '.join(keyphrases)}\n" |
|
if descriptions: |
|
result += f" β’ Can do: {', '.join(descriptions)}\n" |
|
if languages: |
|
result += f" β’ Languages: {', '.join(languages)}\n" |
|
result += "\n" |
|
|
|
result += "\n" |
|
|
|
return result |
|
|
|
def send_task_to_agents_with_keywords(keywords: str, task: str, request: gr.Request) -> str: |
|
_discover_agents_from_env_and_headers(request) |
|
|
|
keyword_list = [kw.strip() for kw in keywords.split(",")] |
|
|
|
matching_agents = _floor_manager.find_agents_with_capability(keyword_list) |
|
|
|
if not matching_agents: |
|
return f"β No agents found with capabilities matching: {keywords}" |
|
|
|
result = f"π― **Found {len(matching_agents)} agents matching '{keywords}':**\n\n" |
|
|
|
for agent in matching_agents: |
|
response = agent.send_utterance(task) |
|
result += f"**{agent.name}:** {response}\n\n" |
|
|
|
return result |
|
|
|
with gr.Blocks(title="MCP-Open Floor Bridge", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown(""" |
|
# π MCP-Open Floor Bridge Server |
|
|
|
**π Please use the new version available under [https://mcp.openfloor.dev](https://mcp.openfloor.dev).** |
|
|
|
## Setup: |
|
- **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"` |
|
- **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com` |
|
""") |
|
|
|
|
|
env_agents = os.getenv("OPENFLOOR_AGENTS", "") |
|
if env_agents: |
|
agent_urls = [url.strip() for url in env_agents.split(",") if url.strip()] |
|
agent_list = '\n'.join([f"- {url}" for url in agent_urls]) |
|
gr.Markdown("### β
Environment Variable Status") |
|
gr.Markdown(f"**OPENFLOOR_AGENTS** is set with **{len(agent_urls)} agent(s)**:") |
|
gr.Markdown(agent_list) |
|
else: |
|
gr.Markdown(""" |
|
### β οΈ Environment Variable Status |
|
**OPENFLOOR_AGENTS** is not set. Agents will be discovered from request headers only. |
|
""") |
|
|
|
|
|
gr.api(discover_openfloor_agents) |
|
gr.api(send_message_to_openfloor_agent) |
|
gr.api(send_to_best_openfloor_agent) |
|
gr.api(execute_agent_capability) |
|
gr.api(list_all_agent_capabilities) |
|
gr.api(send_task_to_agents_with_keywords) |
|
|
|
if __name__ == "__main__": |
|
demo.launch( |
|
share=False, |
|
debug=False, |
|
mcp_server=True |
|
) |