File size: 8,800 Bytes
b4395cf
 
da302c5
b3cdce6
b4395cf
 
 
 
 
 
9e19567
b4395cf
 
da302c5
 
 
b4395cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e19567
 
 
 
b4395cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
355ad4a
b4395cf
 
 
 
 
 
 
 
 
 
 
 
355ad4a
b4395cf
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import os
import base64
import urllib3
import certifi
import requests
import tempfile
import pandas as pd
from openai import OpenAI
from pytubefix import YouTube
from langchain_community.tools import tool
from langchain_community.document_loaders import WikipediaLoader
from bs4 import BeautifulSoup, ResultSet, PageElement, Tag, NavigableString

urllib3.disable_warnings()


@tool
def default_file_reader(file_path: str) -> str | None:
    """
        Default file reader tool that opens a file as a text reads it content and return it as a string.
        Use this default tool if there is no specific file reader for a given file.
    """

    try:
        with open(file_path, 'r') as file:
            return file.read()
    except FileNotFoundError as e:
        print(f"Error:{e}")
        return None


@tool
def image_reader(file_path: str) -> dict[str, str | dict[str, str]] | None:
    """
        Opens and png image and returns it's data as a dictionary.
    """

    try:
        with open(file_path, "rb") as image_file:
            image_data = base64.b64encode(image_file.read()).decode('utf-8')

        return {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}}
    except FileNotFoundError as e:
        print(f"Error:{e}")
        return None


@tool
def excel_column_reader(file_path: str) -> str | None:
    """
        Opens an Excel file, reads the first row to get the names of the columns and return it as a string.
        Use it to find out what data is available in the Excel file.
    """

    try:
        df = pd.read_excel(file_path)
        return '    '.join(df.columns.astype(str))
    except FileNotFoundError as e:
        print(f"Error:{e}")
        return None


@tool
def excel_find_column_values_sum(file_path: str, columns: list[str]) -> None | int:
    """Opens an Excel file, find specified columns by column_name and calculates a total sum of all numeric cells of specified columns"""

    try:
        total = 0
        df = pd.read_excel(file_path)

        for column in columns:
            total += df[column].sum()

        return total
    except FileNotFoundError as e:
        print(f"Error:{e}")
        return None


@tool
def wiki_search(query: str) -> str | None:
    """
        Search wikipedia by query string and return content of the first found page.
        Also use it to get information about shows and actors.
    """

    try:
        wiki_results = ""
        search_docs = WikipediaLoader(query=query, load_max_docs=1).load()
        for doc in search_docs:
            if "source" in doc.metadata and doc.metadata["source"]:
                wiki_results += _fetch_specific_page(doc.metadata["source"])

        return wiki_results
    except requests.exceptions.RequestException as e:
        print(f"Error during request: {e}")
        return None
    except Exception as e:
        print(f"Error parsing results: {e}")
        return None


@tool
def archive_search(query: str) -> str | None:
    """
        Search archive.org by query string and return content of the first found page.
        Use this search when you need to find scientific paper or specific scientific publication detail.
    """

    try:
        ddg_results = []
        archive_results = ""
        link_rows = _fetch_ddg_search_result_links(f"archive.org {query}")
        for link_row in link_rows:
            if not 'archive.org' in link_row.attrs['href']:
                continue

            ddg_results.append({
                'title': link_row.get_text(strip=True),
                'url': link_row.attrs['href']
            })

            archive_results += _fetch_specific_page(link_row.attrs['href'])

            if len(ddg_results) == 1:
                break

        return archive_results
    except requests.exceptions.RequestException as e:
        print(f"Error during request: {e}")
        return None
    except Exception as e:
        print(f"Error parsing results: {e}")
        return None


@tool
def get_ioc_code(country_name: str) -> str | None:
    """
        Accepts country name as a string and returns IOC code of this country.
    """

    try:
        ioc_df = pd.read_html('https://en.wikipedia.org/wiki/List_of_IOC_country_codes')[0]
        ioc_df['Code'] = ioc_df['Code'].str[-3:]
        name_to_code = dict(zip(ioc_df['National Olympic Committee'], ioc_df['Code']))

        return name_to_code.get(country_name)
    except Exception as e:
        print(f"Error: {e}")
        return None

@tool
def check_commutativity(table_definition: str) -> str | None:
    """
        Use this tool if you need to verify whether a binary operation defined by a table is commutative.
        Returns dictionary with two fields: "is_commutative"(boolean) and
        "counter_example_elements" list of elements that violates x∗y=y∗x that prove * is not commutative
        Example of table definition:
        |*|a|b|c|d|e|
        |---|---|---|---|---|---|
        |a|a|b|c|b|d|
        |b|b|c|a|e|c|
        |c|c|a|b|b|a|
        |d|b|e|b|e|d|
        |e|d|b|a|d|c|
    """
    lines = [line.strip() for line in table_definition.strip().splitlines() if
             line.strip().startswith('|') and not line.strip().startswith('|-')]

    # Parse header: skip the '*' cell
    header_cells = [cell.strip() for cell in lines[0].split('|')[1:] if cell.strip()]
    S = header_cells[1:]  # Skip the first header cell which is "*"

    operation_table = {}
    for row in lines[1:]:
        cells = [cell.strip() for cell in row.split('|')[1:] if cell.strip()]
        row_label = cells[0]
        values = cells[1:]
        if len(values) != len(S):
            raise ValueError(f"Row {row_label} does not have the correct number of entries.")
        operation_table[row_label] = dict(zip(S, values))

    counter_example_elements = set()

    for x in S:
        for y in S:
            if operation_table[x][y] != operation_table[y][x]:
                counter_example_elements.update([x, y])


    return ', '.join(sorted(counter_example_elements)) if len(counter_example_elements) > 0 else None

@tool
def audio_to_text(file_path: str) -> str | None:
    """
        Transcribes audio file to text and returns text as a string.
    """
    try:
        client = OpenAI()
        audio_file = open(file_path, "rb")

        transcription = client.audio.transcriptions.create(
            model="gpt-4o-transcribe",
            file=audio_file
        )

        return transcription.text
    except Exception as e:
        print(f"Error: {e}")
        return None

@tool
def video_to_text(video_url: str) -> str | None:
    """
        Downloads YouTube video by url, transcribes it to text and returns text as a string.
    """
    file_path = ""
    try:
        ytx = YouTube(video_url)
        temp_dir = tempfile.gettempdir()
        ysx = ytx.streams.get_highest_resolution()
        file_path = ysx.download(output_path=temp_dir)

        client = OpenAI()
        video_file = open(file_path, "rb")

        transcription = client.audio.transcriptions.create(
            model="gpt-4o-transcribe",
            file=video_file,
            temperature=0.0,
            prompt="Ignore music playing in the background and transcribe all conversations."
        )

        return transcription.text
    except FileNotFoundError:
        print(f"Error: File {file_path} was not found.")
        return None
    except Exception as e:
        print(f"Error: {e}")
        return None


def _fetch_ddg_search_result_links(query: str) -> ResultSet[PageElement | Tag | NavigableString]:
    url = "https://lite.duckduckgo.com/lite/"

    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
    }

    params = {
        'q': query,
        'kl': 'us-en'
    }

    ddg_response = requests.get(url, headers=headers, params=params, verify=False)
    ddg_response.raise_for_status()
    soup = BeautifulSoup(ddg_response.text, 'html.parser')
    return soup.find_all('a', {'class': 'result-link'})


def _fetch_specific_page(url: str) -> str:
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
    }

    wiki_response = requests.get(url, headers=headers, verify=False)
    wiki_response.raise_for_status()

    soup = BeautifulSoup(wiki_response.text, 'html.parser')
    return soup.get_text()