FinalAssignment / tools.py
alex-i07's picture
without submitting, just to test
b4395cf
raw
history blame
8.91 kB
import os
import base64
import requests
import tempfile
import pandas as pd
from openai import OpenAI
from pytubefix import YouTube
from langchain_community.tools import tool
from bs4 import BeautifulSoup, ResultSet, PageElement, Tag, NavigableString
@tool
def default_file_reader(file_path: str) -> str | None:
"""
Default file reader tool that opens a file as a text reads it content and return it as a string.
Use this default tool if there is no specific file reader for a given file.
"""
try:
with open(file_path, 'r') as file:
return file.read()
except FileNotFoundError as e:
print(f"Error:{e}")
return None
@tool
def image_reader(file_path: str) -> dict[str, str | dict[str, str]] | None:
"""
Opens and png image and returns it's data as a dictionary.
"""
try:
with open(file_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8')
return {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}}
except FileNotFoundError as e:
print(f"Error:{e}")
return None
@tool
def excel_column_reader(file_path: str) -> str | None:
"""
Opens an Excel file, reads the first row to get the names of the columns and return it as a string.
Use it to find out what data is available in the Excel file.
"""
try:
df = pd.read_excel(file_path)
return ' '.join(df.columns.astype(str))
except FileNotFoundError as e:
print(f"Error:{e}")
return None
@tool
def excel_find_column_values_sum(file_path: str, columns: list[str]) -> None | int:
"""Opens an Excel file, find specified columns by column_name and calculates a total sum of all numeric cells of specified columns"""
try:
total = 0
df = pd.read_excel(file_path)
for column in columns:
total += df[column].sum()
return total
except FileNotFoundError as e:
print(f"Error:{e}")
return None
@tool
def wiki_search(query: str) -> str | None:
"""
Search wikipedia by query string and return content of the first found page.
Also use it to get information about shows and actors.
"""
try:
ddg_results = []
wiki_results = ""
link_rows = _fetch_ddg_search_result_links(f"wikipedia {query}")
for link_row in link_rows:
if not 'en.wikipedia.org' in link_row.attrs['href']:
continue
ddg_results.append({
'title': link_row.get_text(strip=True),
'url': link_row.attrs['href']
})
wiki_results += _fetch_specific_page(link_row.attrs['href'])
if len(ddg_results) == 1:
break
return wiki_results
except requests.exceptions.RequestException as e:
print(f"Error during request: {e}")
return None
except Exception as e:
print(f"Error parsing results: {e}")
return None
@tool
def archive_search(query: str) -> str | None:
"""
Search archive.org by query string and return content of the first found page.
Use this search when you need to find scientific paper or specific scientific publication detail.
"""
try:
ddg_results = []
archive_results = ""
link_rows = _fetch_ddg_search_result_links(f"archive.org {query}")
for link_row in link_rows:
if not 'archive.org' in link_row.attrs['href']:
continue
ddg_results.append({
'title': link_row.get_text(strip=True),
'url': link_row.attrs['href']
})
archive_results += _fetch_specific_page(link_row.attrs['href'])
if len(ddg_results) == 1:
break
return archive_results
except requests.exceptions.RequestException as e:
print(f"Error during request: {e}")
return None
except Exception as e:
print(f"Error parsing results: {e}")
return None
@tool
def get_ioc_code(country_name: str) -> str | None:
"""
Accepts country name as a string and returns IOC code of this country.
"""
try:
ioc_df = pd.read_html('https://en.wikipedia.org/wiki/List_of_IOC_country_codes')[0]
ioc_df['Code'] = ioc_df['Code'].str[-3:]
name_to_code = dict(zip(ioc_df['National Olympic Committee'], ioc_df['Code']))
return name_to_code.get(country_name)
except Exception as e:
print(f"Error: {e}")
return None
@tool
def check_commutativity(table_definition: str) -> str | None:
"""
Use this tool if you need to verify whether a binary operation defined by a table is commutative.
Returns dictionary with two fields: "is_commutative"(boolean) and
"counter_example_elements" list of elements that violates x∗y=y∗x that prove * is not commutative
Example of table definition:
|*|a|b|c|d|e|
|---|---|---|---|---|---|
|a|a|b|c|b|d|
|b|b|c|a|e|c|
|c|c|a|b|b|a|
|d|b|e|b|e|d|
|e|d|b|a|d|c|
"""
lines = [line.strip() for line in table_definition.strip().splitlines() if
line.strip().startswith('|') and not line.strip().startswith('|-')]
# Parse header: skip the '*' cell
header_cells = [cell.strip() for cell in lines[0].split('|')[1:] if cell.strip()]
S = header_cells[1:] # Skip the first header cell which is "*"
operation_table = {}
for row in lines[1:]:
cells = [cell.strip() for cell in row.split('|')[1:] if cell.strip()]
row_label = cells[0]
values = cells[1:]
if len(values) != len(S):
raise ValueError(f"Row {row_label} does not have the correct number of entries.")
operation_table[row_label] = dict(zip(S, values))
counter_example_elements = set()
for x in S:
for y in S:
if operation_table[x][y] != operation_table[y][x]:
counter_example_elements.update([x, y])
return ', '.join(sorted(counter_example_elements)) if len(counter_example_elements) > 0 else None
@tool
def audio_to_text(file_path: str) -> str | None:
"""
Transcribes audio file to text and returns text as a string.
"""
try:
client = OpenAI()
audio_file = open(file_path, "rb")
transcription = client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=audio_file
)
return transcription.text
except Exception as e:
print(f"Error: {e}")
return None
@tool
def video_to_text(video_url: str) -> str | None:
"""
Downloads YouTube video by url, transcribes it to text and returns text as a string.
"""
file_path = ""
try:
ytx = YouTube(video_url)
temp_dir = tempfile.gettempdir()
ysx = ytx.streams.get_highest_resolution()
file_path = ysx.download(output_path=temp_dir)
client = OpenAI()
video_file = open(file_path, "rb")
transcription = client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=video_file,
temperature=0.0,
prompt="Ignore music playing in the background and transcribe all conversations."
)
return transcription.text
except FileNotFoundError:
print(f"Error: File {file_path} was not found.")
return None
except Exception as e:
print(f"Error: {e}")
return None
def _fetch_ddg_search_result_links(query: str) -> ResultSet[PageElement | Tag | NavigableString]:
url = "https://lite.duckduckgo.com/lite/"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
params = {
'q': query,
'kl': 'us-en'
}
ddg_response = requests.get(url, headers=headers, params=params)
ddg_response.raise_for_status()
soup = BeautifulSoup(ddg_response.text, 'html.parser')
return soup.find_all('a', {'class': 'result-link'})
def _fetch_specific_page(url: str) -> str:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
wiki_response = requests.get(url, headers=headers)
wiki_response.raise_for_status()
soup = BeautifulSoup(wiki_response.text, 'html.parser')
return soup.get_text()