import os import base64 import urllib3 import certifi import requests import tempfile import pandas as pd from openai import OpenAI from pytubefix import YouTube from langchain_community.tools import tool from langchain_community.document_loaders import WikipediaLoader from bs4 import BeautifulSoup, ResultSet, PageElement, Tag, NavigableString urllib3.disable_warnings() @tool def default_file_reader(file_path: str) -> str | None: """ Default file reader tool that opens a file as a text reads it content and return it as a string. Use this default tool if there is no specific file reader for a given file. """ try: with open(file_path, 'r') as file: return file.read() except FileNotFoundError as e: print(f"Error:{e}") return None @tool def image_reader(file_path: str) -> dict[str, str | dict[str, str]] | None: """ Opens and png image and returns it's data as a dictionary. """ try: with open(file_path, "rb") as image_file: image_data = base64.b64encode(image_file.read()).decode('utf-8') return {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}} except FileNotFoundError as e: print(f"Error:{e}") return None @tool def excel_column_reader(file_path: str) -> str | None: """ Opens an Excel file, reads the first row to get the names of the columns and return it as a string. Use it to find out what data is available in the Excel file. """ try: df = pd.read_excel(file_path) return ' '.join(df.columns.astype(str)) except FileNotFoundError as e: print(f"Error:{e}") return None @tool def excel_find_column_values_sum(file_path: str, columns: list[str]) -> None | int: """Opens an Excel file, find specified columns by column_name and calculates a total sum of all numeric cells of specified columns""" try: total = 0 df = pd.read_excel(file_path) for column in columns: total += df[column].sum() return total except FileNotFoundError as e: print(f"Error:{e}") return None @tool def wiki_search(query: str) -> str | None: """ Search wikipedia by query string and return content of the first found page. Also use it to get information about shows and actors. """ try: wiki_results = "" search_docs = WikipediaLoader(query=query, load_max_docs=1).load() for doc in search_docs: if "source" in doc.metadata and doc.metadata["source"]: wiki_results += _fetch_specific_page(doc.metadata["source"]) return wiki_results except requests.exceptions.RequestException as e: print(f"Error during request: {e}") return None except Exception as e: print(f"Error parsing results: {e}") return None @tool def archive_search(query: str) -> str | None: """ Search archive.org by query string and return content of the first found page. Use this search when you need to find scientific paper or specific scientific publication detail. """ try: ddg_results = [] archive_results = "" link_rows = _fetch_ddg_search_result_links(f"archive.org {query}") for link_row in link_rows: if not 'archive.org' in link_row.attrs['href']: continue ddg_results.append({ 'title': link_row.get_text(strip=True), 'url': link_row.attrs['href'] }) archive_results += _fetch_specific_page(link_row.attrs['href']) if len(ddg_results) == 1: break return archive_results except requests.exceptions.RequestException as e: print(f"Error during request: {e}") return None except Exception as e: print(f"Error parsing results: {e}") return None @tool def get_ioc_code(country_name: str) -> str | None: """ Accepts country name as a string and returns IOC code of this country. """ try: ioc_df = pd.read_html('https://en.wikipedia.org/wiki/List_of_IOC_country_codes')[0] ioc_df['Code'] = ioc_df['Code'].str[-3:] name_to_code = dict(zip(ioc_df['National Olympic Committee'], ioc_df['Code'])) return name_to_code.get(country_name) except Exception as e: print(f"Error: {e}") return None @tool def check_commutativity(table_definition: str) -> str | None: """ Use this tool if you need to verify whether a binary operation defined by a table is commutative. Returns dictionary with two fields: "is_commutative"(boolean) and "counter_example_elements" list of elements that violates x∗y=y∗x that prove * is not commutative Example of table definition: |*|a|b|c|d|e| |---|---|---|---|---|---| |a|a|b|c|b|d| |b|b|c|a|e|c| |c|c|a|b|b|a| |d|b|e|b|e|d| |e|d|b|a|d|c| """ lines = [line.strip() for line in table_definition.strip().splitlines() if line.strip().startswith('|') and not line.strip().startswith('|-')] # Parse header: skip the '*' cell header_cells = [cell.strip() for cell in lines[0].split('|')[1:] if cell.strip()] S = header_cells[1:] # Skip the first header cell which is "*" operation_table = {} for row in lines[1:]: cells = [cell.strip() for cell in row.split('|')[1:] if cell.strip()] row_label = cells[0] values = cells[1:] if len(values) != len(S): raise ValueError(f"Row {row_label} does not have the correct number of entries.") operation_table[row_label] = dict(zip(S, values)) counter_example_elements = set() for x in S: for y in S: if operation_table[x][y] != operation_table[y][x]: counter_example_elements.update([x, y]) return ', '.join(sorted(counter_example_elements)) if len(counter_example_elements) > 0 else None @tool def audio_to_text(file_path: str) -> str | None: """ Transcribes audio file to text and returns text as a string. """ try: client = OpenAI() audio_file = open(file_path, "rb") transcription = client.audio.transcriptions.create( model="gpt-4o-transcribe", file=audio_file ) return transcription.text except Exception as e: print(f"Error: {e}") return None @tool def video_to_text(video_url: str) -> str | None: """ Downloads YouTube video by url, transcribes it to text and returns text as a string. """ file_path = "" try: ytx = YouTube(video_url) temp_dir = tempfile.gettempdir() ysx = ytx.streams.get_highest_resolution() file_path = ysx.download(output_path=temp_dir) client = OpenAI() video_file = open(file_path, "rb") transcription = client.audio.transcriptions.create( model="gpt-4o-transcribe", file=video_file, temperature=0.0, prompt="Ignore music playing in the background and transcribe all conversations." ) return transcription.text except FileNotFoundError: print(f"Error: File {file_path} was not found.") return None except Exception as e: print(f"Error: {e}") return None def _fetch_ddg_search_result_links(query: str) -> ResultSet[PageElement | Tag | NavigableString]: url = "https://lite.duckduckgo.com/lite/" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', } params = { 'q': query, 'kl': 'us-en' } ddg_response = requests.get(url, headers=headers, params=params, verify=False) ddg_response.raise_for_status() soup = BeautifulSoup(ddg_response.text, 'html.parser') return soup.find_all('a', {'class': 'result-link'}) def _fetch_specific_page(url: str) -> str: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', } wiki_response = requests.get(url, headers=headers, verify=False) wiki_response.raise_for_status() soup = BeautifulSoup(wiki_response.text, 'html.parser') return soup.get_text()