Spaces:
Running
Running
""" | |
app.py β Hugging Face Space | |
Swaps Anthropic for HF Serverless Inference (Qwen3-235B-A22B) | |
""" | |
import asyncio | |
import os | |
import json | |
from typing import List, Dict, Any, Union | |
from contextlib import AsyncExitStack | |
import gradio as gr | |
from gradio.components.chatbot import ChatMessage | |
from mcp import ClientSession, StdioServerParameters | |
from mcp.client.stdio import stdio_client | |
from dotenv import load_dotenv | |
from huggingface_hub import InferenceClient # NEW β¨ | |
load_dotenv() | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
class MCPClientWrapper: | |
""" | |
Wraps an MCP stdio client + a chat LLM (Qwen3-235B-A22B via HF Serverless). | |
""" | |
def __init__(self): | |
self.session = None | |
self.exit_stack = None | |
self.tools: List[Dict[str, Any]] = [] | |
# --- NEW: Hugging Face client --------------------------------------- | |
self.hf_client = InferenceClient( | |
model="Qwen/Qwen3-235B-A22B", | |
token=os.getenv("HUGGINGFACE_API_TOKEN") | |
) | |
# -------------------------------------------------------------------- | |
# βββββββββββββββββββββββββββ MCP CONNECTION ββββββββββββββββββββββββββββ | |
def connect(self, server_path: str) -> str: | |
return loop.run_until_complete(self._connect(server_path)) | |
async def _connect(self, server_path: str) -> str: | |
if self.exit_stack: | |
await self.exit_stack.aclose() | |
self.exit_stack = AsyncExitStack() | |
is_python = server_path.endswith(".py") | |
command = "python" if is_python else "node" | |
server_params = StdioServerParameters( | |
command=command, | |
args=[server_path], | |
env={"PYTHONIOENCODING": "utf-8", "PYTHONUNBUFFERED": "1"}, | |
) | |
stdio_transport = await self.exit_stack.enter_async_context( | |
stdio_client(server_params) | |
) | |
self.stdio, self.write = stdio_transport | |
self.session = await self.exit_stack.enter_async_context( | |
ClientSession(self.stdio, self.write) | |
) | |
await self.session.initialize() | |
response = await self.session.list_tools() | |
self.tools = [ | |
{ | |
"name": tool.name, | |
"description": tool.description, | |
"input_schema": tool.inputSchema, | |
} | |
for tool in response.tools | |
] | |
tool_names = [tool["name"] for tool in self.tools] | |
return f"Connected to MCP server. Available tools: {', '.join(tool_names)}" | |
# ββββββββββββββββββββββββββββ CHAT HANDLER βββββββββββββββββββββββββββββ | |
def process_message( | |
self, message: str, history: List[Union[Dict[str, Any], ChatMessage]] | |
) -> tuple: | |
if not self.session: | |
return ( | |
history | |
+ [ | |
{"role": "user", "content": message}, | |
{ | |
"role": "assistant", | |
"content": "Please connect to an MCP server first.", | |
}, | |
], | |
gr.Textbox(value=""), | |
) | |
new_messages = loop.run_until_complete(self._process_query(message, history)) | |
return ( | |
history + [{"role": "user", "content": message}] + new_messages, | |
gr.Textbox(value=""), | |
) | |
# ββββββββββββββββββββββββββ INTERNAL LLM CALL βββββββββββββββββββββββββ | |
async def _process_query( | |
self, message: str, history: List[Union[Dict[str, Any], ChatMessage]] | |
): | |
""" | |
Pushes the whole chat history to Qwen3-235B-A22B and returns its reply. | |
Tool calls are *not* forwarded β the HF endpoint only returns text. | |
""" | |
# 1οΈβ£ Build message list in OpenAI-style dicts | |
messages: List[Dict[str, str]] = [] | |
for item in history: | |
if isinstance(item, ChatMessage): | |
role, content = item.role, item.content | |
else: | |
role, content = item.get("role"), item.get("content") | |
if role in {"user", "assistant", "system"}: | |
messages.append({"role": role, "content": content}) | |
messages.append({"role": "user", "content": message}) | |
# 2οΈβ£ Serialise to Qwen chat-markup | |
prompt_parts = [] | |
for m in messages: | |
role = m["role"] | |
prompt_parts.append(f"<|im_start|>{role}\n{m['content']}<|im_end|>") | |
prompt_parts.append("<|im_start|>assistant") # model will complete here | |
prompt = "\n".join(prompt_parts) | |
# 3οΈβ£ Call HF Serverless in a threadpool (non-blocking) | |
async def _generate(): | |
return self.hf_client.text_generation( | |
prompt, | |
max_new_tokens=1024, | |
temperature=0.7, | |
stop_sequences=["<|im_end|>", "<|im_start|>"], | |
) | |
assistant_text: str = await asyncio.get_running_loop().run_in_executor( | |
None, _generate | |
) | |
# 4οΈβ£ Return in Gradio-friendly format | |
return [{"role": "assistant", "content": assistant_text.strip()}] | |
# ββββββββββββββββββββββββββββ GRADIO UI βββββββββββββββββββββββββββββββββββ | |
client = MCPClientWrapper() | |
def gradio_interface(): | |
with gr.Blocks(title="MCP Weather Client") as demo: | |
gr.Markdown("# MCP Weather Assistant") | |
gr.Markdown("Connect to your MCP weather server and chat with the assistant") | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=4): | |
server_path = gr.Textbox( | |
label="Server Script Path", | |
placeholder="Enter path to server script (e.g., weather.py)", | |
value="gradio_mcp_server.py", | |
) | |
with gr.Column(scale=1): | |
connect_btn = gr.Button("Connect") | |
status = gr.Textbox(label="Connection Status", interactive=False) | |
chatbot = gr.Chatbot( | |
value=[], | |
height=500, | |
type="messages", | |
show_copy_button=True, | |
avatar_images=("π€", "π€"), | |
) | |
with gr.Row(equal_height=True): | |
msg = gr.Textbox( | |
label="Your Question", | |
placeholder="Ask about weather or alerts (e.g., What's the weather in New York?)", | |
scale=4, | |
) | |
clear_btn = gr.Button("Clear Chat", scale=1) | |
connect_btn.click(client.connect, inputs=server_path, outputs=status) | |
msg.submit(client.process_message, [msg, chatbot], [chatbot, msg]) | |
clear_btn.click(lambda: [], None, chatbot) | |
return demo | |
# ββββββββββββββββββββββββββββ ENTRY POINT ββββββββββββββββββββββββββββββββ | |
if __name__ == "__main__": | |
if not os.getenv("HUGGINGFACE_API_TOKEN"): | |
print( | |
"Warning: HUGGINGFACE_API_TOKEN not found in environment. " | |
"Set it in your .env file or Space secrets." | |
) | |
interface = gradio_interface() | |
interface.launch(debug=True) # β typo fixed |