example workflow { "workflow_id": "simple-chatbot-v1", "workflow_name": "Simple Chatbot", "nodes": [ { "id": "ChatInput-1", "type": "ChatInput", "data": { "display_name": "User's Question", "template": { "input_value": { "display_name": "Input", "type": "string", "value": "What is the capital of France?", "is_handle": true } } }, "resources": { "cpu": 0.1, "memory": "128Mi", "gpu": "none" } }, { "id": "Prompt-1", "type": "Prompt", "data": { "display_name": "System Prompt", "template": { "prompt_template": { "display_name": "Template", "type": "string", "value": "You are a helpful geography expert. The user asked: {input_value}", "is_handle": true } } }, "resources": { "cpu": 0.1, "memory": "128Mi", "gpu": "none" } }, { "id": "OpenAI-1", "type": "OpenAIModel", "data": { "display_name": "OpenAI gpt-4o-mini", "template": { "model": { "display_name": "Model", "type": "options", "options": ["gpt-4o", "gpt-4o-mini", "gpt-3.5-turbo"], "value": "gpt-4o-mini" }, "api_key": { "display_name": "API Key", "type": "SecretStr", "required": true, "env_var": "OPENAI_API_KEY" }, "prompt": { "display_name": "Prompt", "type": "string", "is_handle": true } } }, "resources": { "cpu": 0.5, "memory": "256Mi", "gpu": "none" } }, { "id": "ChatOutput-1", "type": "ChatOutput", "data": { "display_name": "Final Answer", "template": { "response": { "display_name": "Response", "type": "string", "is_handle": true } } }, "resources": { "cpu": 0.1, "memory": "128Mi", "gpu": "none" } } ], "edges": [ { "source": "ChatInput-1", "source_handle": "input_value", "target": "Prompt-1", "target_handle": "prompt_template" }, { "source": "Prompt-1", "source_handle": "prompt_template", "target": "OpenAI-1", "target_handle": "prompt" }, { "source": "OpenAI-1", "source_handle": "response", "target": "ChatOutput-1", "target_handle": "response" } ] } ## input node { "id": "Input-1", "type": "Input", "data": { "display_name": "Source Data", "template": { "data_type": { "display_name": "Data Type", "type": "options", "options": ["string", "image", "video", "audio", "file"], "value": "string" }, "value": { "display_name": "Value or Path", "type": "string", "value": "This is the initial text." }, "data": { "display_name": "Output Data", "type": "object", "is_handle": true } } }, "resources": { "cpu": 0.1, "memory": "128Mi", "gpu": "none" } } from typing import Any, Dict def process_input(data_type: str, value: Any) -> Dict[str, Any]: Packages the source data and its type for downstream nodes. """ # The output is a dictionary containing both the type and the data/path. # This gives the next node context on how to handle the value. """ output_package = { "type": data_type, "value": value } return {"data": output_package} process_input("string", "hi") ## output node """{ "id": "Output-1", "type": "Output", "data": { "display_name": "Final Result", "template": { "input_data": { "display_name": "Input Data", "type": "object", "is_handle": true } } }, "resources": { "cpu": 0.1, "memory": "128Mi", "gpu": "none" } }""" from typing import Any, Dict def process_output(input_data: Dict[str, Any]) -> None: """ Receives the final data package and prints its contents. """ # Unpacks the dictionary received from the upstream node. data_type = input_data.get("type", "unknown") value = input_data.get("value", "No value provided") # print("--- Final Workflow Output ---") # print(f" Data Type: {data_type}") # print(f" Value/Path: {value}") # print("-----------------------------") dont print output, just return it process_output({'type': 'string', 'value': 'hi'}) ## api request node """{ "id": "APIRequest-1", "type": "APIRequest", "data": { "display_name": "Get User Data", "template": { "url": { "display_name": "URL", "type": "string", "value": "https://api.example.com/users/1" }, "method": { "display_name": "Method", "type": "options", "options": ["GET", "POST", "PUT", "DELETE"], "value": "GET" }, "headers": { "display_name": "Headers (JSON)", "type": "string", "value": "{\"Authorization\": \"Bearer YOUR_TOKEN\"}" }, "body": { "display_name": "Request Body", "type": "object", "is_handle": true }, "response": { "display_name": "Response Data", "type": "object", "is_handle": true } } }, "resources": { "cpu": 0.2, "memory": "256Mi", "gpu": "none" } }""" import requests import json from typing import Any, Dict def process_api_request(url: str, method: str, headers: str, body: Dict = None) -> Dict[str, Any]: """ Performs an HTTP request and returns the JSON response. """ try: parsed_headers = json.loads(headers) except json.JSONDecodeError: print("Warning: Headers are not valid JSON. Using empty headers.") parsed_headers = {} try: response = requests.request( method=method, url=url, headers=parsed_headers, json=body, timeout=10 # 10-second timeout ) # Raise an exception for bad status codes (4xx or 5xx) response.raise_for_status() # The output is a dictionary containing the JSON response. return {"response": response.json()} except requests.exceptions.RequestException as e: print(f"Error during API request: {e}") # Return an error structure on failure return {"response": {"error": str(e), "status_code": getattr(e.response, 'status_code', 500)}} url = "https://jsonplaceholder.typicode.com/posts" method = "GET" headers = "{}" # empty JSON headers body = None # GET requests typically don't send a JSON body result = process_api_request(url, method, headers, body) print(result) url = "https://jsonplaceholder.typicode.com/posts" method = "POST" headers = '{"Content-Type": "application/json"}' body = { "title": "foo", "body": "bar", "userId": 1 } result = process_api_request(url, method, headers, body) print(result) ## react agent tool import os import asyncio from typing import List, Dict, Any from llama_index.core.agent import ReActAgent from llama_index.core.tools import FunctionTool from llama_index.llms.openai import OpenAI from duckduckgo_search import DDGS # Set your API key # os.environ["OPENAI_API_KEY"] = "your-api-key-here" class WorkflowReActAgent: """Complete working ReAct Agent with your workflow tools""" def __init__(self, llm_model: str = "gpt-4o-mini"): self.llm = OpenAI(model=llm_model, temperature=0.1) self.tools = self._create_tools() self.agent = ReActAgent.from_tools( tools=self.tools, llm=self.llm, verbose=True, max_iterations=8 # Reasonable limit ) def _create_tools(self) -> List[FunctionTool]: """Create tools that actually work and get used""" # ๐Ÿ” Web Search Tool (using your exact implementation) def web_search(query: str) -> str: """Search the web for current information""" try: with DDGS() as ddgs: results = [] gen = ddgs.text(query, safesearch="Off") for i, result in enumerate(gen): if i >= 3: # Limit results break results.append(f"โ€ข {result.get('title', '')}: {result.get('body', '')[:150]}...") if results: return f"Search results: {'; '.join(results)}" else: return f"No results found for '{query}'" except Exception as e: return f"Search error: {str(e)}" # ๐Ÿงฎ Calculator Tool def calculate(expression: str) -> str: """Calculate mathematical expressions safely""" try: # Simple and safe evaluation allowed_chars = "0123456789+-*/().,_ " if all(c in allowed_chars for c in expression): result = eval(expression) return f"Result: {result}" else: return f"Invalid expression: {expression}" except Exception as e: return f"Math error: {str(e)}" # ๐Ÿ Python Executor Tool def execute_python(code: str) -> str: """Execute Python code and return results""" import sys from io import StringIO import traceback old_stdout = sys.stdout sys.stdout = StringIO() try: local_scope = {} exec(code, {"__builtins__": __builtins__}, local_scope) output = sys.stdout.getvalue() # Get result from the last line if it's an expression lines = code.strip().split('\n') if lines: try: result = eval(lines[-1], {}, local_scope) return f"Result: {result}\nOutput: {output}".strip() except: pass return f"Output: {output}".strip() if output else "Code executed successfully" except Exception as e: return f"Error: {str(e)}" finally: sys.stdout = old_stdout # ๐ŸŒ API Request Tool def api_request(url: str, method: str = "GET") -> str: """Make HTTP API requests""" import requests try: response = requests.request(method, url, timeout=10) return f"Status: {response.status_code}\nResponse: {response.text[:300]}..." except Exception as e: return f"API error: {str(e)}" # Convert to FunctionTool objects return [ FunctionTool.from_defaults( fn=web_search, name="web_search", description="Search the web for current information on any topic" ), FunctionTool.from_defaults( fn=calculate, name="calculate", description="Calculate mathematical expressions and equations" ), FunctionTool.from_defaults( fn=execute_python, name="execute_python", description="Execute Python code for data processing and calculations" ), FunctionTool.from_defaults( fn=api_request, name="api_request", description="Make HTTP requests to APIs and web services" ) ] def chat(self, message: str) -> str: """Chat with the ReAct agent""" try: response = self.agent.chat(message) return str(response.response) except Exception as e: return f"Agent error: {str(e)}" # ๐Ÿš€ Usage Examples def main(): """Test the working ReAct agent""" agent = WorkflowReActAgent() test_queries = [ "What's the current Bitcoin price and calculate 10% of it?", "Search for news about SpaceX and tell me the latest", "Calculate the compound interest: 1000 * (1.05)^10", "Search for Python programming tips", "What's 15 factorial divided by 12 factorial?", "Find information about the latest iPhone and calculate its price in EUR if 1 USD = 0.92 EUR" ] print("๐Ÿค– WorkflowReActAgent Ready!") print("=" * 60) for i, query in enumerate(test_queries, 1): print(f"\n๐Ÿ”ธ Query {i}: {query}") print("-" * 50) response = agent.chat(query) print(f"๐ŸŽฏ Response: {response}") print("\n" + "="*60) if __name__ == "__main__": main() ## web search tool """{ "id": "WebSearch-1", "type": "WebSearch", "data": { "display_name": "Search for News", "template": { "query": { "display_name": "Search Query", "type": "string", "is_handle": true }, "results": { "display_name": "Search Results", "type": "object", "is_handle": true } } }, "resources": { "cpu": 0.2, "memory": "256Mi", "gpu": "none" } }""" # First, install duckduckgo_search: # pip install duckduckgo_search import json from typing import Any, Dict, List from duckduckgo_search import DDGS def process_web_search(query: str, max_results: int = 10) -> Dict[str, Any]: if not query: return {"results": []} try: # Use the DDGS client and its text() method with DDGS() as ddgs: gen = ddgs.text(query, safesearch="Off") # Collect up to max_results items results: List[Dict[str, str]] = [ {"title": r.get("title", ""), "link": r.get("href", ""), "snippet": r.get("body", "")} for _, r in zip(range(max_results), gen) ] return {"results": results} except Exception as e: return {"results": {"error": str(e)}} # import json # from typing import Any # from llama_index.tools import BaseTool, ToolMetadata # class DuckDuckGoSearchTool(BaseTool): # """A LlamaIndex tool that proxies to process_web_search.""" # metadata = ToolMetadata( # name="duckduckgo_search", # description="Performs a web search via DuckDuckGo and returns JSON results." # ) # def __init__(self, max_results: int = 10): # self.max_results = max_results # def _run(self, query: str) -> str: # # Call our search function and return a JSON string # results = process_web_search(query, max_results=self.max_results) # return json.dumps(results) # async def _arun(self, query: str) -> str: # # Async agents can await this # results = process_web_search(query, max_results=self.max_results) # return json.dumps(results) # from llama_index import GPTVectorStoreIndex, ServiceContext # from llama_index.agent.react import ReactAgent # from llama_index.tools import ToolConfig # # 1. Instantiate the tool # search_tool = DuckDuckGoSearchTool(max_results=5) # # 2. Create an agent and register tools # agent = ReactAgent( # tools=[search_tool], # service_context=ServiceContext.from_defaults() # ) # # 3. Run the agent with a naturalโ€language prompt # response = agent.run("What are the top news about renewable energy?") # print(response) process_web_search(query="devil may cry") ## execute python node """{ "id": "ExecutePython-1", "type": "ExecutePython", "data": { "display_name": "Custom Data Processing", "template": { "code": { "display_name": "Python Code", "type": "string", "value": "def process(data):\n # Example: Extract titles from search results\n titles = [item['title'] for item in data]\n # The 'result' variable will be the output\n result = ', '.join(titles)\n return result" }, "input_vars": { "display_name": "Input Variables", "type": "object", "is_handle": true }, "output_vars": { "display_name": "Output Variables", "type": "object", "is_handle": true } } }, "resources": { "cpu": 0.5, "memory": "512Mi", "gpu": "none" } }""" import sys import traceback from typing import Any, Dict def process_execute_python(code: str, input_vars: Dict[str, Any] = None) -> Dict[str, Any]: """ Executes a string of Python code within an isolated scope. - If the code defines `process(data)`, calls it with `input_vars`. - Otherwise, executes the code top-level and returns any printed output. """ if input_vars is None: input_vars = {} # Capture stdout from io import StringIO old_stdout = sys.stdout sys.stdout = StringIO() local_scope: Dict[str, Any] = {} try: # Execute user code exec(code, {}, local_scope) if "process" in local_scope and callable(local_scope["process"]): result = local_scope["process"](input_vars) else: # No process(): run as script # (re-exec under a fresh namespace to capture prints) exec(code, {}, {}) result = None output = sys.stdout.getvalue() return {"output_vars": result, "stdout": output} except Exception: err = traceback.format_exc() return {"output_vars": None, "error": err} finally: sys.stdout = old_stdout # 1. Code with process(): code1 = """ def process(data): return {"sum": data.get("x",0) + data.get("y",0)} """ print(process_execute_python(code1, {"x":5, "y":7})) # โ†’ {'output_vars': {'sum': 12}, 'stdout': ''} # 2. Standalone code: code2 = 'print("Hello, world!")' print(process_execute_python(code2)) # โ†’ {'output_vars': None, 'stdout': 'Hello, world!\n'} # import json # from typing import Any # from llama_index.tools import BaseTool, ToolMetadata # class ExecutePythonTool(BaseTool): # """Executes arbitrary Python code strings in an isolated scope.""" # metadata = ToolMetadata( # name="execute_python", # description="Runs user-supplied Python code. Requires optional `process(data)` or runs script." # ) # def _run(self, code: str) -> str: # # Call the executor and serialize the dict result # result = process_execute_python(code) # return json.dumps(result) # async def _arun(self, code: str) -> str: # result = process_execute_python(code) # return json.dumps(result) # from llama_index.agent.react import ReactAgent # from llama_index import ServiceContext # tool = ExecutePythonTool() # agent = ReactAgent(tools=[tool], service_context=ServiceContext.from_defaults()) # # Agent will call `execute_python` when needed. # response = agent.run("Please run the Python code: print('Test')") # print(response) ## conditional logix """{ "id": "ConditionalLogic-1", "type": "ConditionalLogic", "data": { "display_name": "Check User Role", "template": { "operator": { "display_name": "Operator", "type": "options", "options": ["==", "!=", ">", "<", ">=", "<=", "contains", "not contains"], "value": "==" }, "comparison_value": { "display_name": "Comparison Value", "type": "string", "value": "admin" }, "input_value": { "display_name": "Input to Check", "type": "any", "is_handle": true }, "true_output": { "display_name": "Path if True", "type": "any", "is_handle": true }, "false_output": { "display_name": "Path if False", "type": "any", "is_handle": true } } }, "resources": { "cpu": 0.1, "memory": "128Mi", "gpu": "none" } }""" from typing import Any, Dict def process_conditional_logic(operator: str, comparison_value: str, input_value: Any) -> Dict[str, Any]: """ Evaluates a condition and returns the input value on the appropriate output handle. """ result = False # Attempt to convert types for numeric comparison try: num_input = float(input_value) num_comp = float(comparison_value) except (ValueError, TypeError): num_input, num_comp = None, None # Evaluate condition if operator == '==' : result = input_value == comparison_value elif operator == '!=': result = input_value != comparison_value elif operator == '>' and num_input is not None: result = num_input > num_comp elif operator == '<' and num_input is not None: result = num_input < num_comp elif operator == '>=' and num_input is not None: result = num_input >= num_comp elif operator == '<=' and num_input is not None: result = num_input <= num_comp elif operator == 'contains': result = str(comparison_value) in str(input_value) elif operator == 'not contains': result = str(comparison_value) not in str(input_value) # Return the input data on the correct output handle based on the result if result: # The key "true_output" matches the source_handle in the workflow edge return {"true_output": input_value} else: # The key "false_output" matches the source_handle in the workflow edge return {"false_output": input_value} ## wait node """{ "id": "Wait-1", "type": "Wait", "data": { "display_name": "Wait for 5 Seconds", "template": { "duration": { "display_name": "Duration (seconds)", "type": "number", "value": 5 }, "passthrough_input": { "display_name": "Passthrough Data In", "type": "any", "is_handle": true }, "passthrough_output": { "display_name": "Passthrough Data Out", "type": "any", "is_handle": true } } }, "resources": { "cpu": 0.1, "memory": "128Mi", "gpu": "none" } }""" import time from typing import Any, Dict def process_wait(duration: int, passthrough_input: Any = None) -> Dict[str, Any]: """ Pauses execution for a given duration and then passes data through. """ time.sleep(duration) # The output key "passthrough_output" matches the source_handle return {"passthrough_output": passthrough_input} ## chat node """{ "id": "ChatModel-1", "type": "ChatModel", "data": { "display_name": "AI Assistant", "template": { "provider": { "display_name": "Provider", "type": "options", "options": ["OpenAI", "Anthropic"], "value": "OpenAI" }, "model": { "display_name": "Model Name", "type": "string", "value": "gpt-4o-mini" }, "api_key": { "display_name": "API Key", "type": "SecretStr", "required": true, "env_var": "OPENAI_API_KEY" }, "system_prompt": { "display_name": "System Prompt (Optional)", "type": "string", "value": "You are a helpful assistant." }, "prompt": { "display_name": "Prompt", "type": "string", "is_handle": true }, "response": { "display_name": "Response", "type": "string", "is_handle": true } } }, "resources": { "cpu": 0.5, "memory": "256Mi", "gpu": "none" } }""" import os from typing import Any, Dict from openai import OpenAI from anthropic import Anthropic def process_chat_model(provider: str, model: str, api_key: str, prompt: str, system_prompt: str = "") -> Dict[str, Any]: """ Calls the specified chat model provider with a given prompt. """ response_text = "" if provider == "OpenAI": client = OpenAI(api_key=api_key) messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) completion = client.chat.completions.create(model=model, messages=messages) response_text = completion.choices[0].message.content elif provider == "Anthropic": client = Anthropic(api_key=api_key) message = client.messages.create( model=model, max_tokens=2048, system=system_prompt, messages=[{"role": "user", "content": prompt}] ) response_text = message.content[0].text return {"response": response_text} def test_openai(): openai_key = os.getenv("OPENAI_API_KEY") if not openai_key: raise RuntimeError("Set the OPENAI_API_KEY environment variable.") result = process_chat_model( provider="OpenAI", model="gpt-3.5-turbo", api_key=openai_key, system_prompt="You are a helpful assistant.", prompt="What's the capital of France?" ) print("OpenAI response:", result["response"]) def test_anthropic(): anthropic_key = os.getenv("ANTHROPIC_API_KEY") if not anthropic_key: raise RuntimeError("Set the ANTHROPIC_API_KEY environment variable.") result = process_chat_model( provider="Anthropic", model="claude-sonnet-4-20250514", api_key=anthropic_key, system_prompt="You are a concise assistant.", prompt="List three benefits of renewable energy." ) print("Anthropic response:", result["response"]) if __name__ == "__main__": test_openai() test_anthropic() ## rag node 1 knowledge base """{ "id": "KnowledgeBase-1", "type": "KnowledgeBase", "data": { "display_name": "Create Product Docs KB", "template": { "kb_name": { "display_name": "Knowledge Base Name", "type": "string", "value": "product-docs-v1" }, "source_type": { "display_name": "Source Type", "type": "options", "options": ["Directory", "URL"], "value": "URL" }, "path_or_url": { "display_name": "Path or URL", "type": "string", "value": "https://docs.modal.com/get-started" }, "knowledge_base": { "display_name": "Knowledge Base Out", "type": "object", "is_handle": true } } }, "resources": { "cpu": 2.0, "memory": "1Gi", "gpu": "none" } }""" import os from typing import Any, Dict from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, Settings from llama_index.readers.web import SimpleWebPageReader from llama_index.embeddings.huggingface import HuggingFaceEmbedding def process_knowledge_base(kb_name: str, source_type: str, path_or_url: str) -> Dict[str, Any]: """ Creates and persists a LlamaIndex VectorStoreIndex. """ # Use a high-quality, local model for embeddings Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") if source_type == "URL": documents = SimpleWebPageReader(html_to_text=True).load_data([path_or_url]) else: documents = SimpleDirectoryReader(input_dir=path_or_url).load_data() index = VectorStoreIndex.from_documents(documents) storage_path = os.path.join("./storage", kb_name) index.storage_context.persist(persist_dir=storage_path) # Return a reference object to the persisted index return {"knowledge_base": {"name": kb_name, "path": storage_path}} ## rag node 2 query """{ "id": "RAGQuery-1", "type": "RAGQuery", "data": { "display_name": "Retrieve & Augment Prompt", "template": { "query": { "display_name": "Original Query", "type": "string", "is_handle": true }, "knowledge_base": { "display_name": "Knowledge Base", "type": "object", "is_handle": true }, "rag_prompt": { "display_name": "Augmented Prompt Out", "type": "string", "is_handle": true } } }, "resources": { "cpu": 1.0, "memory": "512Mi", "gpu": "none" } }""" from typing import Any, Dict from llama_index.core import StorageContext, load_index_from_storage, Settings from llama_index.embeddings.huggingface import HuggingFaceEmbedding def process_rag_query(query: str, knowledge_base: Dict) -> Dict[str, Any]: """ Retrieves context from a knowledge base and creates an augmented prompt. """ Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") # Load the index from the path provided by the KnowledgeBase node storage_context = StorageContext.from_defaults(persist_dir=knowledge_base['path']) index = load_index_from_storage(storage_context) retriever = index.as_retriever(similarity_top_k=3) retrieved_nodes = retriever.retrieve(query) # Combine the retrieved text into a single context block context_str = "\n\n".join([node.get_content() for node in retrieved_nodes]) # Construct the final prompt for the ChatModel rag_prompt_template = ( "Use the following context to answer the question. " "If the answer is not in the context, say you don't know.\n\n" "Context:\n{context}\n\n" "Question: {question}" ) final_prompt = rag_prompt_template.format(context=context_str, question=query) return {"rag_prompt": final_prompt} # --- Demo Execution --- if __name__ == "__main__": # 1. Build the KB from Modal docs kb_result = process_knowledge_base( kb_name="product-docs-v1", source_type="URL", path_or_url="https://modal.com/docs/guide" ) print("Knowledge Base Created:", kb_result) # 2. Run a RAG query user_query = "How do I get started with Modal?" rag_result = process_rag_query(user_query, kb_result["knowledge_base"]) print("\nAugmented RAG Prompt:\n", rag_result["rag_prompt"]) ## speech to text """{ "id": "HFSpeechToText-1", "type": "HFSpeechToText", "data": { "display_name": "Transcribe Audio (Whisper)", "template": { "model_id": { "display_name": "Model ID", "type": "string", "value": "openai/whisper-large-v3" }, "audio_input": { "display_name": "Audio Input", "type": "object", "is_handle": true }, "transcribed_text": { "display_name": "Transcribed Text", "type": "string", "is_handle": true } } }, "resources": { "cpu": 1.0, "memory": "4Gi", "gpu": "T4" } }""" import torch from transformers import pipeline from typing import Any, Dict # --- In a real Modal app, this would be structured like this: --- # # import modal # image = modal.Image.debian_slim().pip_install("transformers", "torch", "librosa") # stub = modal.Stub("speech-to-text-model") # # @stub.cls(gpu="T4", image=image) # class WhisperModel: # def __init__(self): # device = "cuda" if torch.cuda.is_available() else "cpu" # self.pipe = pipeline( # "automatic-speech-recognition", # model="openai/whisper-large-v3", # torch_dtype=torch.float16, # device=device, # ) # # @modal.method() # def run_inference(self, audio_path): # # The function logic from below would be here. # ... # ------------------------------------------------------------------- def process_hf_speech_to_text(model_id: str, audio_input: Dict[str, Any]) -> Dict[str, Any]: """ Transcribes an audio file using a Hugging Face ASR pipeline. NOTE: This function simulates the inference part of a stateful Modal class. The model pipeline should be loaded only once. """ if audio_input.get("type") != "audio": raise ValueError("Input must be of type 'audio'.") audio_path = audio_input["value"] # --- This part would be inside the Modal class method --- # In a real implementation, 'pipe' would be a class attribute (self.pipe) # loaded in the __init__ or @enter method. device = "cuda" if torch.cuda.is_available() else "cpu" pipe = pipeline( "automatic-speech-recognition", model=model_id, torch_dtype=torch.float16, device=device, ) outputs = pipe( audio_path, chunk_length_s=30, batch_size=24, return_timestamps=True, ) return {"transcribed_text": outputs["text"]} ## text to speech """{ "id": "HFTextToSpeech-1", "type": "HFTextToSpeech", "data": { "display_name": "Generate Speech", "template": { "model_id": { "display_name": "Model ID", "type": "string", "value": "microsoft/speecht5_tts" }, "text_input": { "display_name": "Text Input", "type": "string", "is_handle": true }, "audio_output": { "display_name": "Audio Output", "type": "object", "is_handle": true } } }, "resources": { "cpu": 1.0, "memory": "4Gi", "gpu": "T4" } }""" import torch from transformers import pipeline import soundfile as sf from typing import Any, Dict def process_hf_text_to_speech(model_id: str, text_input: str) -> Dict[str, Any]: """ Synthesizes speech from text using a Hugging Face TTS pipeline. NOTE: Simulates the inference part of a stateful Modal class. """ # --- This part would be inside the Modal class method --- # The pipeline and embeddings would be loaded once in the class. pipe = pipeline("text-to-speech", model=model_id, device="cuda") # SpeechT5 requires speaker embeddings for voice characteristics from transformers import SpeechT5HifiGan vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan").to("cuda") # A dummy embedding for a generic voice import numpy as np speaker_embedding = np.random.rand(1, 512).astype(np.float32) speech = pipe(text_input, forward_params={"speaker_embeddings": speaker_embedding}) # Save the output to a file and return the path output_path = "/tmp/output.wav" sf.write(output_path, speech["audio"], samplerate=speech["sampling_rate"]) return {"audio_output": {"type": "audio", "value": output_path}} ## text generation """{ "id": "HFTextGeneration-1", "type": "HFTextGeneration", "data": { "display_name": "Generate with Mistral", "template": { "model_id": { "display_name": "Model ID", "type": "string", "value": "mistralai/Mistral-7B-Instruct-v0.2" }, "max_new_tokens": { "display_name": "Max New Tokens", "type": "number", "value": 256 }, "prompt": { "display_name": "Prompt", "type": "string", "is_handle": true }, "generated_text": { "display_name": "Generated Text", "type": "string", "is_handle": true } } }, "resources": { "cpu": 2.0, "memory": "24Gi", "gpu": "A10G" } }""" import torch from transformers import pipeline from typing import Any, Dict def process_hf_text_generation(model_id: str, prompt: str, max_new_tokens: int) -> Dict[str, Any]: """ Generates text from a prompt using a Hugging Face LLM. NOTE: Simulates the inference part of a stateful Modal class. """ # --- This part would be inside the Modal class method --- # The pipeline is loaded once on container start. pipe = pipeline( "text-generation", model=model_id, torch_dtype=torch.bfloat16, device_map="auto", ) messages = [{"role": "user", "content": prompt}] # The pipeline needs the prompt to be formatted correctly for instruct models formatted_prompt = pipe.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) outputs = pipe( formatted_prompt, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.7, top_k=50, top_p=0.95, ) # Extract only the generated part of the text generated_text = outputs[0]["generated_text"] # The output includes the prompt, so we remove it. response_text = generated_text[len(formatted_prompt):] return {"generated_text": response_text} ## image generation """{ "id": "HFImageGeneration-1", "type": "HFImageGeneration", "data": { "display_name": "Generate Image (SDXL)", "template": { "model_id": { "display_name": "Base Model ID", "type": "string", "value": "stabilityai/stable-diffusion-xl-base-1.0" }, "lora_id": { "display_name": "LoRA Model ID (Optional)", "type": "string", "value": "nerijs/pixel-art-xl" }, "prompt": { "display_name": "Prompt", "type": "string", "is_handle": true }, "image_output": { "display_name": "Image Output", "type": "object", "is_handle": true } } }, "resources": { "cpu": 2.0, "memory": "24Gi", "gpu": "A10G" } }""" import torch from diffusers import StableDiffusionXLPipeline from typing import Any, Dict def process_hf_image_generation(model_id: str, prompt: str, lora_id: str = None) -> Dict[str, Any]: """ Generates an image using a Stable Diffusion pipeline, with optional LoRA. NOTE: Simulates the inference part of a stateful Modal class. """ # --- This part would be inside the Modal class method --- # The base pipeline is loaded once. pipe = StableDiffusionXLPipeline.from_pretrained( model_id, torch_dtype=torch.float16, variant="fp16", use_safetensors=True ).to("cuda") # If a LoRA is specified, load and fuse it. # In a real app, this logic would be more complex to handle multiple LoRAs. if lora_id: pipe.load_lora_weights(lora_id) pipe.fuse_lora() # Generate the image image = pipe(prompt=prompt).images[0] output_path = "/tmp/generated_image.png" image.save(output_path) return {"image_output": {"type": "image", "value": output_path}} ## captioning image to text """{ "id": "HFVisionModel-1", "type": "HFVisionModel", "data": { "display_name": "Describe Image", "template": { "task": { "display_name": "Task", "type": "options", "options": ["image-to-text"], "value": "image-to-text" }, "model_id": { "display_name": "Model ID", "type": "string", "value": "Salesforce/blip-image-captioning-large" }, "image_input": { "display_name": "Image Input", "type": "object", "is_handle": true }, "result": { "display_name": "Result", "type": "string", "is_handle": true } } }, "resources": { "cpu": 1.0, "memory": "8Gi", "gpu": "T4" } }""" from transformers import pipeline from PIL import Image from typing import Any, Dict def process_hf_vision_model(task: str, model_id: str, image_input: Dict[str, Any]) -> Dict[str, Any]: """ Performs a vision-based task, like image captioning. NOTE: Simulates the inference part of a stateful Modal class. """ if image_input.get("type") != "image": raise ValueError("Input must be of type 'image'.") image_path = image_input["value"] # --- This part would be inside the Modal class method --- # The pipeline is loaded once. pipe = pipeline(task, model=model_id, device="cuda") # Open the image file image = Image.open(image_path) result = pipe(image) # The output format for this pipeline is a list of dicts # e.g., [{'generated_text': 'a cat sitting on a couch'}] output_text = result[0]['generated_text'] return {"result": output_text} import os from openai import OpenAI client = OpenAI( base_url="https://api.studio.nebius.com/v1/", api_key=os.environ.get("NEBIUS_API_KEY") ) response = client.images.generate( model="black-forest-labs/flux-dev", response_format="b64_json", extra_body={ "response_extension": "png", "width": 1024, "height": 1024, "num_inference_steps": 28, "negative_prompt": "", "seed": -1 }, prompt="pokemon" ) print(response.to_json()) ## nebius image generation """{ "id": "NebiusImage-1", "type": "NebiusImage", "data": { "display_name": "Nebius Image Generation", "template": { "model": { "display_name": "Model", "type": "options", "options": [ "black-forest-labs/flux-dev", "black-forest-labs/flux-schnell", "stability-ai/sdxl" ], "value": "black-forest-labs/flux-dev" }, "api_key": { "display_name": "Nebius API Key", "type": "SecretStr", "required": true, "env_var": "NEBIUS_API_KEY" }, "prompt": { "display_name": "Prompt", "type": "string", "is_handle": true }, "negative_prompt": { "display_name": "Negative Prompt (Optional)", "type": "string", "value": "" }, "width": { "display_name": "Width", "type": "number", "value": 1024 }, "height": { "display_name": "Height", "type": "number", "value": 1024 }, "num_inference_steps": { "display_name": "Inference Steps", "type": "number", "value": 28 }, "seed": { "display_name": "Seed", "type": "number", "value": -1 }, "image_output": { "display_name": "Image Output", "type": "object", "is_handle": true } } }, "resources": { "cpu": 0.2, "memory": "256Mi", "gpu": "none" } }""" import os import base64 from typing import Any, Dict from openai import OpenAI def process_nebius_image( model: str, api_key: str, prompt: str, negative_prompt: str = "", width: int = 1024, height: int = 1024, num_inference_steps: int = 28, seed: int = -1 ) -> Dict[str, Any]: """ Generates an image using the Nebius AI Studio API. """ if not api_key: raise ValueError("Nebius API key is missing.") client = OpenAI( base_url="https://api.studio.nebius.com/v1/", api_key=api_key ) try: response = client.images.generate( model=model, response_format="b64_json", prompt=prompt, extra_body={ "response_extension": "png", "width": width, "height": height, "num_inference_steps": num_inference_steps, "negative_prompt": negative_prompt, "seed": seed } ) # Extract the base64 encoded string b64_data = response.data[0].b64_json # Decode the string and save the image to a file image_bytes = base64.b64decode(b64_data) output_path = "/tmp/nebius_image.png" with open(output_path, "wb") as f: f.write(image_bytes) # Return a data package with the path to the generated image return {"image_output": {"type": "image", "value": output_path}} except Exception as e: print(f"Error calling Nebius API: {e}") return {"image_output": {"error": str(e)}} ## mcp new """{ "id": "MCPConnection-1", "type": "MCPConnection", "data": { "display_name": "MCP Server Connection", "template": { "server_url": { "display_name": "MCP Server URL", "type": "string", "value": "http://localhost:8000/sse", "info": "URL to MCP server (HTTP/SSE or stdio command)" }, "connection_type": { "display_name": "Connection Type", "type": "dropdown", "options": ["http", "stdio"], "value": "http" }, "allowed_tools": { "display_name": "Allowed Tools (Optional)", "type": "list", "info": "Filter specific tools. Leave empty for all tools" }, "api_key": { "display_name": "API Key (Optional)", "type": "SecretStr", "env_var": "MCP_API_KEY" }, "mcp_tools_output": { "display_name": "MCP Tools Output", "type": "list", "is_handle": true } } }, "resources": { "cpu": 0.1, "memory": "128Mi", "gpu": "none" } } """ """{ "id": "MCPAgent-1", "type": "MCPAgent", "data": { "display_name": "MCP-Powered AI Agent", "template": { "mcp_tools_input": { "display_name": "MCP Tools Input", "type": "list", "is_handle": true }, "llm_model": { "display_name": "LLM Model", "type": "dropdown", "options": ["gpt-4", "gpt-3.5-turbo", "gpt-4o", "gpt-4o-mini"], "value": "gpt-4o-mini" }, "system_prompt": { "display_name": "System Prompt", "type": "text", "value": "You are a helpful AI assistant with access to various tools. Use the available tools to help answer user questions accurately.", "multiline": true }, "user_query": { "display_name": "User Query", "type": "string", "is_handle": true }, "max_iterations": { "display_name": "Max Iterations", "type": "int", "value": 10 }, "agent_response": { "display_name": "Agent Response", "type": "string", "is_handle": true } } }, "resources": { "cpu": 0.5, "memory": "512Mi", "gpu": "none" } } """ import asyncio import os from typing import List, Optional, Dict, Any from llama_index.tools.mcp import BasicMCPClient, McpToolSpec, get_tools_from_mcp_url, aget_tools_from_mcp_url from llama_index.core.tools import FunctionTool class MCPConnectionNode: """Node to connect to MCP servers and retrieve tools""" def __init__(self): self.client = None self.tools = [] async def execute(self, server_url: str, connection_type: str = "http", allowed_tools: Optional[List[str]] = None, api_key: Optional[str] = None) -> Dict[str, Any]: """ Connect to MCP server and retrieve available tools """ try: # Set API key if provided if api_key: os.environ["MCP_API_KEY"] = api_key print(f"๐Ÿ”Œ Connecting to MCP server: {server_url}") if connection_type == "http": # Use LlamaIndex's built-in function to get tools[2] tools = await aget_tools_from_mcp_url( server_url, allowed_tools=allowed_tools ) else: # For stdio connections self.client = BasicMCPClient(server_url) mcp_tool_spec = McpToolSpec( client=self.client, allowed_tools=allowed_tools ) tools = await mcp_tool_spec.to_tool_list_async() self.tools = tools print(f"โœ… Successfully connected! Retrieved {len(tools)} tools:") for tool in tools: print(f" - {tool.metadata.name}: {tool.metadata.description}") return { "success": True, "tools_count": len(tools), "tool_names": [tool.metadata.name for tool in tools], "mcp_tools_output": tools } except Exception as e: print(f"โŒ Connection failed: {str(e)}") return { "success": False, "error": str(e), "mcp_tools_output": [] } # Example usage async def mcp_connection_demo(): node = MCPConnectionNode() # Using a public MCP server (you'll need to replace with actual public servers) result = await node.execute( server_url="http://localhost:8000/sse", # Replace with public MCP server connection_type="http", allowed_tools=None # Get all tools ) return result from llama_index.core.agent import FunctionCallingAgentWorker, AgentRunner from llama_index.llms.openai import OpenAI from llama_index.core.tools import FunctionTool from typing import List, Dict, Any import os class MCPAgentNode: """Node to create and run MCP-powered AI agents""" def __init__(self): self.agent = None self.tools = [] async def execute(self, mcp_tools_input: List[FunctionTool], user_query: str, llm_model: str = "gpt-4o-mini", system_prompt: str = "You are a helpful AI assistant.", max_iterations: int = 10) -> Dict[str, Any]: """ Create and run MCP-powered agent using FunctionCallingAgent """ try: if not mcp_tools_input: return { "success": False, "error": "No MCP tools provided", "agent_response": "No tools available to process the query." } print(f"๐Ÿค– Creating agent with {len(mcp_tools_input)} tools...") # Initialize LLM[1] llm = OpenAI( model=llm_model, api_key=os.getenv("OPENAI_API_KEY"), temperature=0.1 ) # Create function calling agent (more reliable than ReAct)[2] agent_worker = FunctionCallingAgentWorker.from_tools( tools=mcp_tools_input, llm=llm, verbose=True, system_prompt=system_prompt ) self.agent = AgentRunner(agent_worker) print(f"๐Ÿ’ญ Processing query: {user_query}") # Execute the query response = self.agent.chat(user_query) return { "success": True, "agent_response": str(response.response), "user_query": user_query, "tools_used": len(mcp_tools_input) } except Exception as e: print(f"โŒ Agent execution failed: {str(e)}") return { "success": False, "error": str(e), "agent_response": f"Sorry, I encountered an error while processing your query: {str(e)}" } # Example usage async def mcp_agent_demo(tools: List[FunctionTool]): node = MCPAgentNode() result = await node.execute( mcp_tools_input=tools, user_query="What tools do you have available and what can you help me with?", llm_model="gpt-4o-mini", system_prompt="You are a helpful AI assistant. Use your available tools to provide accurate and useful responses." ) return result example import asyncio import os from typing import List, Dict, Any from llama_index.core.tools import FunctionTool from llama_index.core.agent import FunctionCallingAgentWorker, AgentRunner from llama_index.llms.openai import OpenAI class CompleteMCPWorkflowDemo: """Complete demo of MCP workflow with connection and agent nodes""" def __init__(self): self.connection_node = MCPConnectionNode() self.agent_node = MCPAgentNode() # Set your OpenAI API key # os.environ["OPENAI_API_KEY"] = "your-openai-api-key-here" async def create_mock_mcp_tools(self) -> List[FunctionTool]: """ Create mock MCP tools that simulate a real MCP server Replace this with actual MCP server connection when available """ def get_weather(city: str, country: str = "US") -> str: """Get current weather information for a city""" weather_data = { "london": "Cloudy, 15ยฐC, humidity 80%", "paris": "Sunny, 22ยฐC, humidity 45%", "tokyo": "Rainy, 18ยฐC, humidity 90%", "new york": "Partly cloudy, 20ยฐC, humidity 55%" } result = weather_data.get(city.lower(), f"Weather data not available for {city}") return f"Weather in {city}, {country}: {result}" def search_news(topic: str, limit: int = 5) -> str: """Search for latest news on a given topic""" news_items = [ f"Breaking: New developments in {topic}", f"Analysis: {topic} trends for 2025", f"Expert opinion on {topic} industry changes", f"Research shows {topic} impact on society", f"Global {topic} market outlook" ] return f"Top {limit} news articles about {topic}:\n" + "\n".join(news_items[:limit]) def calculate_math(expression: str) -> str: """Calculate mathematical expressions safely""" try: # Simple and safe evaluation allowed_chars = "0123456789+-*/().,_ " if all(c in allowed_chars for c in expression): result = eval(expression) return f"Result: {expression} = {result}" else: return f"Invalid expression: {expression}" except Exception as e: return f"Error calculating {expression}: {str(e)}" def get_company_info(company: str) -> str: """Get basic company information""" companies = { "openai": "OpenAI - AI research company, creator of GPT models", "microsoft": "Microsoft - Technology corporation, cloud computing and software", "google": "Google - Search engine and technology company", "amazon": "Amazon - E-commerce and cloud computing platform" } return companies.get(company.lower(), f"Company information not found for {company}") # Convert to FunctionTool objects[2] tools = [ FunctionTool.from_defaults(fn=get_weather), FunctionTool.from_defaults(fn=search_news), FunctionTool.from_defaults(fn=calculate_math), FunctionTool.from_defaults(fn=get_company_info) ] return tools async def run_complete_workflow(self): """ Run the complete MCP workflow demonstration """ print("๐Ÿš€ Starting Complete MCP Workflow Demo") print("=" * 60) # Step 1: Setup MCP Connection (simulated) print("\n๐Ÿ“ก Step 1: Setting up MCP Connection...") # In real implementation, this would connect to actual MCP server mock_tools = await self.create_mock_mcp_tools() connection_result = { "success": True, "tools_count": len(mock_tools), "tool_names": [tool.metadata.name for tool in mock_tools], "mcp_tools_output": mock_tools } if connection_result["success"]: print(f"โœ… MCP Connection successful!") print(f"๐Ÿ“‹ Retrieved {connection_result['tools_count']} tools:") for tool_name in connection_result['tool_names']: print(f" - {tool_name}") else: print(f"โŒ MCP Connection failed: {connection_result.get('error')}") return # Step 2: Create and test MCP Agent print(f"\n๐Ÿค– Step 2: Creating MCP-Powered Agent...") test_queries = [ "What's the weather like in London?", "Search for news about artificial intelligence", "Calculate 15 * 8 + 32", "Tell me about OpenAI company", "What tools do you have and what can you help me with?" ] for i, query in enumerate(test_queries, 1): print(f"\n๐Ÿ’ฌ Query {i}: {query}") print("-" * 40) agent_result = await self.agent_node.execute( mcp_tools_input=connection_result["mcp_tools_output"], user_query=query, llm_model="gpt-4o-mini", system_prompt="""You are a helpful AI assistant with access to weather, news, calculation, and company information tools. When a user asks a question: 1. Determine which tool(s) can help answer their question 2. Use the appropriate tool(s) to gather information 3. Provide a clear, helpful response based on the tool results Always be informative and explain what tools you used.""", max_iterations=5 ) if agent_result["success"]: print(f"๐ŸŽฏ Agent Response:") print(f"{agent_result['agent_response']}") else: print(f"โŒ Agent Error: {agent_result['error']}") print("\n" + "="*50) # Function to connect to real MCP servers when available async def connect_to_real_mcp_server(server_url: str): """ Example of connecting to a real MCP server Replace server_url with actual public MCP servers """ try: from llama_index.tools.mcp import aget_tools_from_mcp_url print(f"๐Ÿ”Œ Attempting to connect to: {server_url}") tools = await aget_tools_from_mcp_url(server_url) print(f"โœ… Connected successfully! Found {len(tools)} tools:") for tool in tools: print(f" - {tool.metadata.name}: {tool.metadata.description}") return tools except Exception as e: print(f"โŒ Failed to connect to {server_url}: {e}") return [] # Main execution async def main(): """Run the complete demo""" # Option 1: Run with mock tools (works immediately) print("๐ŸŽฎ Running MCP Workflow Demo with Mock Tools") demo = CompleteMCPWorkflowDemo() await demo.run_complete_workflow() # Option 2: Try connecting to real MCP servers (uncomment when available) # real_servers = [ # "http://your-mcp-server.com:8000/sse", # "https://api.example.com/mcp" # ] # # for server_url in real_servers: # tools = await connect_to_real_mcp_server(server_url) # if tools: # # Use real tools with agent # agent_node = MCPAgentNode() # result = await agent_node.execute( # mcp_tools_input=tools, # user_query="What can you help me with?", # llm_model="gpt-4o-mini" # ) # print(f"Real MCP Agent Response: {result}") if __name__ == "__main__": asyncio.run(main())