ofp_mcp / app.py
azettl's picture
Update app.py
22c5e7e verified
import gradio as gr
import requests
import json
import os
from datetime import datetime
from typing import List, Dict, Any, Optional
from openfloor import (
Envelope, Conversation, Sender, DialogEvent, TextFeature,
UtteranceEvent, GetManifestsEvent, To
)
from openfloor.manifest import *
from openfloor.envelope import *
# Function to build the envelope for a manifest request using Open Floor library
def build_manifest_request_envelope(agent_url):
print(f"πŸ—οΈ DEBUG: Building manifest envelope for {agent_url}")
# Create conversation and sender
conversation = Conversation()
sender = Sender(
speakerUri="openfloor://localhost/TestClient",
serviceUrl="http://localhost"
)
print(f"πŸ—οΈ DEBUG: Created sender with speakerUri='openfloor://localhost/TestClient'")
# Create envelope
envelope = Envelope(conversation=conversation, sender=sender)
print(f"πŸ—οΈ DEBUG: Created envelope")
# Add get manifests event
manifest_event = GetManifestsEvent(
to=To(serviceUrl=agent_url, private=False)
)
envelope.events.append(manifest_event)
print(f"πŸ—οΈ DEBUG: Added GetManifestsEvent targeting {agent_url}")
return envelope
# Function to build an utterance envelope using Open Floor library
def build_utterance_envelope(agent_url, message_text):
# Create conversation and sender
conversation = Conversation()
sender = Sender(
speakerUri="openfloor://localhost/TestClient",
serviceUrl="http://localhost"
)
# Create envelope
envelope = Envelope(conversation=conversation, sender=sender)
# Create dialog event with text feature
dialog_event = DialogEvent(
speakerUri="openfloor://localhost/TestClient",
features={"text": TextFeature(values=[message_text])}
)
# Create utterance event
utterance_event = UtteranceEvent(
dialogEvent=dialog_event,
to=To(serviceUrl=agent_url, private=False)
)
envelope.events.append(utterance_event)
return envelope
# Function to send the request to the agent (EXACT COPY FROM WORKING CODE WITH DEBUG)
def send_request(agent_url, request_type, message_text="Hello, this is a test message!"):
print(f"πŸ”„ DEBUG: send_request called with agent_url='{agent_url}', request_type='{request_type}'")
try:
if request_type == "manifest":
envelope = build_manifest_request_envelope(agent_url)
print(f"πŸ“¦ DEBUG: Built manifest envelope for {agent_url}")
else: # utterance
envelope = build_utterance_envelope(agent_url, message_text)
print(f"πŸ“¦ DEBUG: Built utterance envelope for {agent_url}")
# Convert to JSON using the library's method
json_str = envelope.to_json()
print(f"πŸ“ DEBUG: Envelope JSON length: {len(json_str)} chars")
print(f"πŸ“ DEBUG: Full envelope JSON: {json_str}")
envelope_dict = json.loads(json_str)
json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2)
print(f"πŸ“€ DEBUG: Full request payload: {json_payload_pretty}")
print(f"πŸ“€ DEBUG: Request payload preview: {json_payload_pretty[:200]}...")
# Send request
headers = {'Content-Type': 'application/json'}
payload = {"openFloor": envelope_dict}
print(f"🌐 DEBUG: Sending {request_type} request to {agent_url}")
print(f"πŸ”§ DEBUG: Headers: {headers}")
response = requests.post(agent_url, json=payload, headers=headers, timeout=10)
print(f"πŸ“₯ DEBUG: Response status: {response.status_code}")
print(f"πŸ“₯ DEBUG: Response headers: {dict(response.headers)}")
print(f"πŸ“₯ DEBUG: Response content length: {len(response.text)} chars")
print(f"πŸ“₯ DEBUG: Response preview: {response.text[:300]}...")
# Handle response
try:
response_json = response.json()
print(f"βœ… DEBUG: Successfully parsed JSON response")
if isinstance(response_json, dict):
print(f"πŸ“‹ DEBUG: Response JSON keys: {list(response_json.keys())}")
except Exception as json_err:
print(f"❌ DEBUG: JSON parse error: {json_err}")
response_json = {"raw_response": response.text}
# Return: envelope, status, response, status_message
status_message = f"βœ… Request sent successfully (HTTP {response.status_code})"
print(f"🎯 DEBUG: Returning status_message: {status_message}")
return json_payload_pretty, response.status_code, response_json, status_message
except Exception as e:
print(f"πŸ’₯ DEBUG: Exception in send_request: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
error_response = {"error": f"Error: {str(e)}"}
return "", 500, error_response, f"❌ Request failed: {str(e)}"
class OpenFloorAgent:
def __init__(self, url: str, name: str = None):
self.url = url
self.name = name or url
self.manifest = None
self.capabilities = []
def get_manifest(self):
"""Get the agent's manifest and capabilities"""
print(f"🎯 DEBUG: get_manifest called for {self.url}")
try:
# Use the EXACT working function
json_payload, status_code, response_data, status_msg = send_request(self.url, "manifest")
print(f"πŸ“Š DEBUG: send_request returned status_code={status_code}")
print(f"πŸ“Š DEBUG: response_data type: {type(response_data)}")
print(f"πŸ“Š DEBUG: response_data: {response_data}")
if status_code == 200 and isinstance(response_data, dict):
print(f"βœ… DEBUG: Got 200 response and dict data")
if "openFloor" in response_data:
print(f"βœ… DEBUG: Found 'openFloor' key in response")
if "events" in response_data["openFloor"]:
print(f"βœ… DEBUG: Found 'events' key in openFloor")
events = response_data["openFloor"]["events"]
print(f"πŸ“‹ DEBUG: Found {len(events)} events")
for i, event in enumerate(events):
print(f"πŸ“‹ DEBUG: Event {i}: {event}")
# Handle both singular and plural event types
if event.get("eventType") in ["publishManifests", "publishManifest"]:
print(f"βœ… DEBUG: Found {event.get('eventType')} event")
# Handle different manifest parameter names
parameters = event.get("parameters", {})
manifests = None
if "manifests" in parameters:
manifests = parameters["manifests"]
print(f"πŸ“‹ DEBUG: Found 'manifests' key with {len(manifests)} items")
elif "servicingManifests" in parameters:
manifests = parameters["servicingManifests"]
print(f"πŸ“‹ DEBUG: Found 'servicingManifests' key with {len(manifests)} items")
elif "manifest" in parameters:
# Single manifest wrapped in array
manifests = [parameters["manifest"]]
print(f"πŸ“‹ DEBUG: Found single 'manifest' key, wrapping in array")
if manifests and len(manifests) > 0:
self.manifest = manifests[0]
# Capabilities might be a single dict or list of dicts
raw_capabilities = self.manifest.get("capabilities", [])
if isinstance(raw_capabilities, dict):
# Single capability dict - wrap in list
self.capabilities = [raw_capabilities]
elif isinstance(raw_capabilities, list):
# Already a list
self.capabilities = raw_capabilities
else:
self.capabilities = []
print(f"βœ… DEBUG: Successfully loaded manifest with {len(self.capabilities)} capabilities")
return True
else:
print(f"❌ DEBUG: No manifests found in parameters: {list(parameters.keys())}")
else:
print(f"❌ DEBUG: Event type '{event.get('eventType')}' is not a manifest event")
else:
print(f"❌ DEBUG: No 'events' key in openFloor")
else:
print(f"❌ DEBUG: No 'openFloor' key in response")
else:
print(f"❌ DEBUG: status_code={status_code}, is_dict={isinstance(response_data, dict)}")
print(f"❌ DEBUG: get_manifest returning False for {self.url}")
return False
except Exception as e:
print(f"πŸ’₯ DEBUG: Exception in get_manifest for {self.url}: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return False
def send_utterance(self, message: str) -> str:
"""Send an utterance to the agent and get response"""
try:
# Use the EXACT working function
json_payload, status_code, response_data, status_msg = send_request(self.url, "utterance", message)
if status_code == 200 and isinstance(response_data, dict):
if "openFloor" in response_data and "events" in response_data["openFloor"]:
for event in response_data["openFloor"]["events"]:
if event.get("eventType") == "utterance":
dialog_event = event.get("parameters", {}).get("dialogEvent", {})
features = dialog_event.get("features", {})
text_feature = features.get("text", {})
tokens = text_feature.get("tokens", [])
if tokens:
return tokens[0].get("value", "No response")
return f"Agent responded but no text found: {response_data}"
else:
return f"Agent error: HTTP {status_code}"
except Exception as e:
return f"Error communicating with agent: {e}"
class BuiltInFloorManager:
def __init__(self):
self.agent_registry = {}
self.active_conversations = {}
def register_agent_from_openfloor_agent(self, openfloor_agent: OpenFloorAgent):
if not openfloor_agent.manifest:
return False
speaker_uri = openfloor_agent.manifest.get("identification", {}).get("speakerUri")
if not speaker_uri:
speaker_uri = f"tag:{openfloor_agent.url.replace('https://', '').replace('http://', '')},2025:agent"
self.agent_registry[speaker_uri] = {
'openfloor_agent': openfloor_agent,
'manifest': openfloor_agent.manifest,
'url': openfloor_agent.url,
'capabilities': openfloor_agent.capabilities,
'status': 'available',
'last_seen': datetime.now()
}
print(f"πŸ›οΈ Floor Manager: Registered {openfloor_agent.name}")
return True
def find_best_agent_for_task(self, task: str) -> Optional[OpenFloorAgent]:
if not self.agent_registry:
return None
task_lower = task.lower()
best_agent_info = None
best_score = 0
for speaker_uri, agent_info in self.agent_registry.items():
score = 0
capabilities = agent_info.get('capabilities', [])
for capability in capabilities:
keyphrases = capability.get("keyphrases", [])
descriptions = capability.get("descriptions", [])
for keyphrase in keyphrases:
if keyphrase.lower() in task_lower:
score += 2
for description in descriptions:
if any(word in task_lower for word in description.lower().split()):
score += 1
if score > best_score:
best_score = score
best_agent_info = agent_info
if not best_agent_info and self.agent_registry:
best_agent_info = list(self.agent_registry.values())[0]
return best_agent_info['openfloor_agent'] if best_agent_info else None
def get_all_capabilities(self) -> Dict[str, List[Dict]]:
all_capabilities = {}
for speaker_uri, agent_info in self.agent_registry.items():
agent_name = agent_info['openfloor_agent'].name
all_capabilities[agent_name] = agent_info.get('capabilities', [])
return all_capabilities
def find_agents_with_capability(self, capability_keywords: List[str]) -> List[OpenFloorAgent]:
matching_agents = []
keywords_lower = [kw.lower() for kw in capability_keywords]
for speaker_uri, agent_info in self.agent_registry.items():
capabilities = agent_info.get('capabilities', [])
for capability in capabilities:
keyphrases = capability.get("keyphrases", [])
descriptions = capability.get("descriptions", [])
for keyphrase in keyphrases:
if any(kw in keyphrase.lower() for kw in keywords_lower):
matching_agents.append(agent_info['openfloor_agent'])
break
if agent_info['openfloor_agent'] in matching_agents:
break
for description in descriptions:
if any(kw in description.lower() for kw in keywords_lower):
matching_agents.append(agent_info['openfloor_agent'])
break
if agent_info['openfloor_agent'] in matching_agents:
break
return matching_agents
_agents_cache: Dict[str, OpenFloorAgent] = {}
_floor_manager = BuiltInFloorManager()
_initialized = False
def _ensure_initialization():
"""Ensure MCP server is properly initialized"""
global _initialized
if not _initialized:
print("πŸ”§ Initializing MCP-Open Floor Bridge...")
# Pre-discover agents from environment if available
env_agents = os.getenv("OPENFLOOR_AGENTS", "")
if env_agents:
env_urls = [url.strip() for url in env_agents.split(",") if url.strip()]
print(f"🌍 Pre-loading {len(env_urls)} agents from environment...")
for url in env_urls:
if url and url not in _agents_cache:
try:
agent = OpenFloorAgent(url)
if agent.get_manifest():
_agents_cache[url] = agent
_floor_manager.register_agent_from_openfloor_agent(agent)
print(f"βœ… Pre-loaded agent: {agent.name} at {url}")
else:
print(f"❌ Failed to pre-load agent from: {url}")
except Exception as e:
print(f"❌ Error pre-loading agent {url}: {e}")
_initialized = True
print("πŸš€ MCP-Open Floor Bridge initialized")
def _discover_agents_from_env_and_headers(request: gr.Request) -> Dict[str, OpenFloorAgent]:
_ensure_initialization()
agent_urls = []
# Environment variable
env_agents = os.getenv("OPENFLOOR_AGENTS", "")
if env_agents:
env_urls = [url.strip() for url in env_agents.split(",") if url.strip()]
agent_urls.extend(env_urls)
print(f"🌍 Found {len(env_urls)} agents from OPENFLOOR_AGENTS env var")
# Request headers
headers = dict(request.headers) if request else {}
if "x-openfloor-agents" in headers:
header_urls = [url.strip() for url in headers["x-openfloor-agents"].split(",") if url.strip()]
agent_urls.extend(header_urls)
print(f"πŸ“‘ Found {len(header_urls)} agents from x-openfloor-agents header")
# Remove duplicates
unique_urls = []
seen = set()
for url in agent_urls:
if url not in seen:
unique_urls.append(url)
seen.add(url)
print(f"πŸ” Total unique agent URLs to discover: {len(unique_urls)}")
# Register agents
for url in unique_urls:
if url and url not in _agents_cache:
try:
agent = OpenFloorAgent(url)
if agent.get_manifest():
_agents_cache[url] = agent
_floor_manager.register_agent_from_openfloor_agent(agent)
print(f"βœ… Discovered and registered agent: {agent.name} at {url}")
else:
print(f"❌ Failed to get manifest from: {url}")
except Exception as e:
print(f"❌ Error discovering agent {url}: {e}")
return _agents_cache
def discover_openfloor_agents(request: gr.Request) -> str:
agents = _discover_agents_from_env_and_headers(request)
if not agents:
return """❌ No agents discovered.
**Setup Options:**
1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"`
2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com`
3. **Both**: Environment + header URLs will be combined"""
result = "πŸ€– **Discovered Open Floor Agents:**\n\n"
for url, agent in agents.items():
result += f"**{agent.name}**\n"
result += f"- URL: {url}\n"
if agent.manifest:
identification = agent.manifest.get("identification", {})
result += f"- Role: {identification.get('role', 'Unknown')}\n"
result += f"- Synopsis: {identification.get('synopsis', 'No description')}\n"
result += f"- Capabilities: {len(agent.capabilities)}\n"
for cap in agent.capabilities:
keyphrases = cap.get("keyphrases", [])
descriptions = cap.get("descriptions", [])
if keyphrases:
result += f" β€’ Keywords: {', '.join(keyphrases)}\n"
if descriptions:
result += f" β€’ Description: {', '.join(descriptions)}\n"
result += "\n"
return result
def send_message_to_openfloor_agent(agent_url: str, message: str, request: gr.Request) -> str:
agents = _discover_agents_from_env_and_headers(request)
if agent_url not in agents:
return f"❌ Agent not found: {agent_url}. Please discover agents first or check the URL."
agent = agents[agent_url]
response = agent.send_utterance(message)
return response
def send_to_best_openfloor_agent(task_description: str, request: gr.Request) -> str:
_discover_agents_from_env_and_headers(request)
best_agent = _floor_manager.find_best_agent_for_task(task_description)
if not best_agent:
return """❌ No agents available.
**Setup Options:**
1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"`
2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com`"""
response = best_agent.send_utterance(task_description)
return f"πŸ›οΈ **Floor Manager Selected: {best_agent.name}**\n\nπŸ€– **Response**: {response}"
def execute_agent_capability(capability_keywords: str, task_request: str, request: gr.Request) -> str:
_discover_agents_from_env_and_headers(request)
keywords = [kw.strip() for kw in capability_keywords.split(",")]
matching_agents = _floor_manager.find_agents_with_capability(keywords)
if not matching_agents:
available_capabilities = _floor_manager.get_all_capabilities()
capability_summary = ""
for agent_name, capabilities in available_capabilities.items():
capability_summary += f"\n- {agent_name}: "
all_keywords = []
for cap in capabilities:
all_keywords.extend(cap.get("keyphrases", []))
capability_summary += ", ".join(all_keywords[:5])
return f"❌ No agents found with capabilities matching: {capability_keywords}\n\nAvailable capabilities:{capability_summary}"
selected_agent = matching_agents[0]
response = selected_agent.send_utterance(task_request)
return f"🎯 **Selected Agent: {selected_agent.name}** (matched: {capability_keywords})\n\nπŸ€– **Response**: {response}"
def list_all_agent_capabilities(request: gr.Request) -> str:
_discover_agents_from_env_and_headers(request)
all_capabilities = _floor_manager.get_all_capabilities()
if not all_capabilities:
return """❌ No agents discovered.
**Setup Options:**
1. **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"`
2. **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com`"""
result = "🎯 **All Agent Capabilities:**\n\n"
for agent_name, capabilities in all_capabilities.items():
result += f"**{agent_name}:**\n"
if not capabilities:
result += " - No capabilities defined\n\n"
continue
for i, capability in enumerate(capabilities, 1):
keyphrases = capability.get("keyphrases", [])
descriptions = capability.get("descriptions", [])
languages = capability.get("languages", [])
result += f" **Capability {i}:**\n"
if keyphrases:
result += f" β€’ Keywords: {', '.join(keyphrases)}\n"
if descriptions:
result += f" β€’ Can do: {', '.join(descriptions)}\n"
if languages:
result += f" β€’ Languages: {', '.join(languages)}\n"
result += "\n"
result += "\n"
return result
def send_task_to_agents_with_keywords(keywords: str, task: str, request: gr.Request) -> str:
_discover_agents_from_env_and_headers(request)
keyword_list = [kw.strip() for kw in keywords.split(",")]
matching_agents = _floor_manager.find_agents_with_capability(keyword_list)
if not matching_agents:
return f"❌ No agents found with capabilities matching: {keywords}"
result = f"🎯 **Found {len(matching_agents)} agents matching '{keywords}':**\n\n"
for agent in matching_agents:
response = agent.send_utterance(task)
result += f"**{agent.name}:** {response}\n\n"
return result
with gr.Blocks(title="MCP-Open Floor Bridge", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸŒ‰ MCP-Open Floor Bridge Server
**πŸ†• Please use the new version available under [https://mcp.openfloor.dev](https://mcp.openfloor.dev).**
## Setup:
- **Environment Variable**: `export OPENFLOOR_AGENTS="https://agent1.com,https://agent2.com"`
- **Request Header**: `x-openfloor-agents: https://agent1.com,https://agent2.com`
""")
# Environment variable status check
env_agents = os.getenv("OPENFLOOR_AGENTS", "")
if env_agents:
agent_urls = [url.strip() for url in env_agents.split(",") if url.strip()]
agent_list = '\n'.join([f"- {url}" for url in agent_urls])
gr.Markdown("### βœ… Environment Variable Status")
gr.Markdown(f"**OPENFLOOR_AGENTS** is set with **{len(agent_urls)} agent(s)**:")
gr.Markdown(agent_list)
else:
gr.Markdown("""
### ⚠️ Environment Variable Status
**OPENFLOOR_AGENTS** is not set. Agents will be discovered from request headers only.
""")
# Expose functions as MCP tools
gr.api(discover_openfloor_agents)
gr.api(send_message_to_openfloor_agent)
gr.api(send_to_best_openfloor_agent)
gr.api(execute_agent_capability)
gr.api(list_all_agent_capabilities)
gr.api(send_task_to_agents_with_keywords)
if __name__ == "__main__":
demo.launch(
share=False,
debug=False,
mcp_server=True
)