MCP-Image-Gen / app.py
Nymbo's picture
swapping Claude for Qwen-30B-A3B on HF inference
956ea73 verified
raw
history blame
7.62 kB
"""
app.py – Hugging Face Space
Swaps Anthropic for HF Serverless Inference (Qwen3-235B-A22B)
"""
import asyncio
import os
import json
from typing import List, Dict, Any, Union
from contextlib import AsyncExitStack
import gradio as gr
from gradio.components.chatbot import ChatMessage
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from dotenv import load_dotenv
from huggingface_hub import InferenceClient # NEW ✨
load_dotenv()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class MCPClientWrapper:
"""
Wraps an MCP stdio client + a chat LLM (Qwen3-235B-A22B via HF Serverless).
"""
def __init__(self):
self.session = None
self.exit_stack = None
self.tools: List[Dict[str, Any]] = []
# --- NEW: Hugging Face client ---------------------------------------
self.hf_client = InferenceClient(
model="Qwen/Qwen3-235B-A22B",
token=os.getenv("HUGGINGFACE_API_TOKEN")
)
# --------------------------------------------------------------------
# ─────────────────────────── MCP CONNECTION ────────────────────────────
def connect(self, server_path: str) -> str:
return loop.run_until_complete(self._connect(server_path))
async def _connect(self, server_path: str) -> str:
if self.exit_stack:
await self.exit_stack.aclose()
self.exit_stack = AsyncExitStack()
is_python = server_path.endswith(".py")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[server_path],
env={"PYTHONIOENCODING": "utf-8", "PYTHONUNBUFFERED": "1"},
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
response = await self.session.list_tools()
self.tools = [
{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema,
}
for tool in response.tools
]
tool_names = [tool["name"] for tool in self.tools]
return f"Connected to MCP server. Available tools: {', '.join(tool_names)}"
# ──────────────────────────── CHAT HANDLER ─────────────────────────────
def process_message(
self, message: str, history: List[Union[Dict[str, Any], ChatMessage]]
) -> tuple:
if not self.session:
return (
history
+ [
{"role": "user", "content": message},
{
"role": "assistant",
"content": "Please connect to an MCP server first.",
},
],
gr.Textbox(value=""),
)
new_messages = loop.run_until_complete(self._process_query(message, history))
return (
history + [{"role": "user", "content": message}] + new_messages,
gr.Textbox(value=""),
)
# ────────────────────────── INTERNAL LLM CALL ─────────────────────────
async def _process_query(
self, message: str, history: List[Union[Dict[str, Any], ChatMessage]]
):
"""
Pushes the whole chat history to Qwen3-235B-A22B and returns its reply.
Tool calls are *not* forwarded – the HF endpoint only returns text.
"""
# 1️⃣ Build message list in OpenAI-style dicts
messages: List[Dict[str, str]] = []
for item in history:
if isinstance(item, ChatMessage):
role, content = item.role, item.content
else:
role, content = item.get("role"), item.get("content")
if role in {"user", "assistant", "system"}:
messages.append({"role": role, "content": content})
messages.append({"role": "user", "content": message})
# 2️⃣ Serialise to Qwen chat-markup
prompt_parts = []
for m in messages:
role = m["role"]
prompt_parts.append(f"<|im_start|>{role}\n{m['content']}<|im_end|>")
prompt_parts.append("<|im_start|>assistant") # model will complete here
prompt = "\n".join(prompt_parts)
# 3️⃣ Call HF Serverless in a threadpool (non-blocking)
async def _generate():
return self.hf_client.text_generation(
prompt,
max_new_tokens=1024,
temperature=0.7,
stop_sequences=["<|im_end|>", "<|im_start|>"],
)
assistant_text: str = await asyncio.get_running_loop().run_in_executor(
None, _generate
)
# 4️⃣ Return in Gradio-friendly format
return [{"role": "assistant", "content": assistant_text.strip()}]
# ──────────────────────────── GRADIO UI ───────────────────────────────────
client = MCPClientWrapper()
def gradio_interface():
with gr.Blocks(title="MCP Weather Client") as demo:
gr.Markdown("# MCP Weather Assistant")
gr.Markdown("Connect to your MCP weather server and chat with the assistant")
with gr.Row(equal_height=True):
with gr.Column(scale=4):
server_path = gr.Textbox(
label="Server Script Path",
placeholder="Enter path to server script (e.g., weather.py)",
value="gradio_mcp_server.py",
)
with gr.Column(scale=1):
connect_btn = gr.Button("Connect")
status = gr.Textbox(label="Connection Status", interactive=False)
chatbot = gr.Chatbot(
value=[],
height=500,
type="messages",
show_copy_button=True,
avatar_images=("πŸ‘€", "πŸ€–"),
)
with gr.Row(equal_height=True):
msg = gr.Textbox(
label="Your Question",
placeholder="Ask about weather or alerts (e.g., What's the weather in New York?)",
scale=4,
)
clear_btn = gr.Button("Clear Chat", scale=1)
connect_btn.click(client.connect, inputs=server_path, outputs=status)
msg.submit(client.process_message, [msg, chatbot], [chatbot, msg])
clear_btn.click(lambda: [], None, chatbot)
return demo
# ──────────────────────────── ENTRY POINT ────────────────────────────────
if __name__ == "__main__":
if not os.getenv("HUGGINGFACE_API_TOKEN"):
print(
"Warning: HUGGINGFACE_API_TOKEN not found in environment. "
"Set it in your .env file or Space secrets."
)
interface = gradio_interface()
interface.launch(debug=True) # ← typo fixed