File size: 5,697 Bytes
4f1f407 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
import base64
from mimetypes import guess_type
from dotenv import load_dotenv
from typing import TypedDict, Annotated, List
from langgraph.graph.message import add_messages
from langchain_core.messages import AnyMessage
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition
from langchain_tavily import TavilySearch
from langchain_community.tools import RequestsGetTool
from langchain_community.utilities.requests import TextRequestsWrapper
from openai import OpenAI, audio
import pandas as pd
from langchain_experimental.tools.python.tool import PythonREPLTool
load_dotenv()
# Initialize our LLM
gpt1 = 'gpt-4o'
gpt2 = 'gpt-4.1-2025-04-14'
gpt3 = 'o3-mini'
model = ChatOpenAI(model=gpt3)
def integer_comparison(numb1: int, numb2: int) -> int:
"""
Given input parameters
* numb1: an integer number,
* numb2: an integer number,
This function returns
* 0 if integer numb1 is equal to integer numb2
* 1 if integer numb1 is strictly bigger than integer numb2
* -1 if integer numb1 is strictly smaller than integer numb2
"""
if numb1 == numb2:
return 0
elif numb1 > numb2:
return 1
else:
return -1
def local_image_to_data_url(image_path: str) -> str:
# Guess MIME type
mime_type, _ = guess_type(image_path)
if mime_type is None:
mime_type = "application/octet-stream"
# Read file and base64-encode
with open(image_path, "rb") as f:
data = f.read()
b64 = base64.b64encode(data).decode("utf-8")
return f"data:{mime_type};base64,{b64}"
def describe_a_photo(file: str) -> str:
"""
Given input parameters
* file: file name of an image to be described in detail,
This function returns
* A string containing the description of the image
"""
data_url = local_image_to_data_url(f"assets/{file}")
client = OpenAI()
messages = [
{
"role": "user",
"content": [
"Describe what you see in this image:",
{
"type": "image_url",
"image_url": {
"url": data_url,
"detail": "auto" # optional: "low", "high", or "auto"
}
}
]
}
]
resp = client.chat.completions.create(model="gpt-4o", messages=messages)
return resp.choices[0].message.content
def transcript_an_audio(file: str) -> str:
"""
Given input parameters
* file: file name of an audio to be transcripted
This function returns
* A string containing the transcription of the audio file
"""
with open(f"assets/{file}", "rb") as audio_file:
# 3. Call the transcription endpoint
resp = audio.transcriptions.create(
model="whisper-1",
file=audio_file,
# optionally: prompt="...", response_format="verbose_json", temperature=0, language="en"
)
transcript = resp.text
return transcript
def read_an_excel(file: str) -> str:
"""
Given input parameters
* file: file name of an excel file to be attached
This function returns
* A string containing the excel rows as text using json format
"""
df = pd.read_excel(f"assets/{file}")
records = df.to_dict(orient="records")
return str(records)
def load_python_script(file: str) -> str:
"""
Given input parameters
* file: file name of an python script file to be executed
This function returns
* A string containing the file content, the python script
"""
with open(f"assets/{file}", "rb") as f:
data = f.read()
return str(data)
# Simple instantiation with just the allow_dangerous_requests flag
requests_wrapper = TextRequestsWrapper() # or customize headers/proxy if needed
# noinspection PyArgumentList
visit_tool = RequestsGetTool(
requests_wrapper=requests_wrapper,
allow_dangerous_requests=True # PyCharm may flag this, ignore inspection
)
# Add to your tools list:
#visit_tool = RequestsGetTool(allow_dangerous_requests=True)
tools = [TavilySearch(max_results=5),
visit_tool,
integer_comparison,
describe_a_photo,
transcript_an_audio,
read_an_excel,
load_python_script,
PythonREPLTool()]
#llm_with_tools = model.bind_tools(tools, parallel_tool_calls=False)
llm_with_tools = model.bind_tools(tools)
# Generate the AgentState and Agent graph
class AgentState(TypedDict):
messages: Annotated[List[AnyMessage], add_messages]
def assistant(state: AgentState):
return {
"messages": [llm_with_tools.invoke(state["messages"])],
}
def create_and_compile_oai_agent():
from openai import OpenAI
import os
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
models = client.models.list()
#print("Available models:")
#for m in models.data:
# print(m.id)
## The graph
builder = StateGraph(AgentState)
# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message requires a tool, route to tools
# Otherwise, provide a direct response
tools_condition,
)
builder.add_edge("tools", "assistant")
return builder.compile()
|