DIVY118 commited on
Commit
7e2578f
·
verified ·
1 Parent(s): 4e72cef

Update RT.py

Browse files
Files changed (1) hide show
  1. RT.py +3 -1187
RT.py CHANGED
@@ -1,1189 +1,5 @@
1
- #VAR
2
- MAX_RESULTS = 10
3
 
4
 
5
- import asyncio
6
- import logging
7
- import warnings
8
- from typing import Dict, Generator, Optional
9
- import nest_asyncio
10
- import json
11
- import logging
12
- import sys
13
- from collections import deque
14
- from datetime import datetime, timezone
15
- from decimal import Decimal
16
- from itertools import cycle
17
- from typing import AsyncGenerator, Deque, Dict, Optional, Set, Tuple
18
- from curl_cffi import requests
19
- from docstring_inheritance import GoogleDocstringInheritanceMeta
20
- from lxml import html
21
- import json
22
- import re
23
- from html import unescape
24
- from typing import Optional
25
- from urllib.parse import unquote
26
- from dataclasses import dataclass
27
- from typing import Dict, Optional
28
- from random import randint
29
-
30
-
31
- class DuckDuckGoSearchException(Exception):
32
- """"""
33
-
34
-
35
- @dataclass
36
- class MapsResult:
37
- """Represents a result from the maps search."""
38
-
39
- title: Optional[str] = None
40
- address: Optional[str] = None
41
- country_code: Optional[str] = None
42
- latitude: Optional[str] = None
43
- longitude: Optional[str] = None
44
- url: Optional[str] = None
45
- desc: Optional[str] = None
46
- phone: Optional[str] = None
47
- image: Optional[str] = None
48
- source: Optional[str] = None
49
- hours: Optional[Dict[str, str]] = None
50
- category: Optional[str] = None
51
- facebook: Optional[str] = None
52
- instagram: Optional[str] = None
53
- twitter: Optional[str] = None
54
-
55
-
56
- REGEX_500_IN_URL = re.compile(r"(?:\d{3}-\d{2}\.js)")
57
- REGEX_STRIP_TAGS = re.compile("<.*?>")
58
- REGEX_VQD = re.compile(rb"""vqd=['"]?([^&"']+)""")
59
-
60
-
61
- def _extract_vqd(html_bytes: bytes, keywords: str) -> Optional[str]:
62
- """Extract vqd from html using a regular expression."""
63
- try:
64
- match = REGEX_VQD.search(html_bytes)
65
- if match:
66
- return match.group(1).decode()
67
- except Exception:
68
- pass
69
- raise DuckDuckGoSearchException(
70
- f"_extract_vqd() {keywords=} Could not extract vqd.")
71
-
72
-
73
- def _text_extract_json(html_bytes: bytes, keywords: str) -> Optional[str]:
74
- """text(backend="api") -> extract json from html."""
75
- try:
76
- start = html_bytes.index(b"DDG.pageLayout.load('d',") + 24
77
- end = html_bytes.index(b");DDG.duckbar.load(", start)
78
- data = html_bytes[start:end]
79
- return json.loads(data)
80
- except Exception as ex:
81
- raise DuckDuckGoSearchException(
82
- f"_text_extract_json() {keywords=} {type(ex).__name__}: {ex}") from ex
83
-
84
-
85
- def _is_500_in_url(url: str) -> bool:
86
- """Something like '506-00.js' inside the url."""
87
- return bool(REGEX_500_IN_URL.search(url))
88
-
89
-
90
- def _normalize(raw_html: str) -> str:
91
- """Strip HTML tags from the raw_html string."""
92
- return unescape(REGEX_STRIP_TAGS.sub("", raw_html)) if raw_html else ""
93
-
94
-
95
- def _normalize_url(url: str) -> str:
96
- """Unquote URL and replace spaces with '+'."""
97
- return unquote(url.replace(" ", "+")) if url else ""
98
-
99
-
100
- logger = logging.getLogger("duckduckgo_search.AsyncDDGS")
101
- # Not working on Windows, NotImplementedError (https://curl-cffi.readthedocs.io/en/latest/faq/)
102
- if sys.platform.lower().startswith("win"):
103
- asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
104
-
105
-
106
- class AsyncDDGS(metaclass=GoogleDocstringInheritanceMeta):
107
- """webscout_search async class to get search results from duckduckgo.com."""
108
-
109
- def __init__(self, headers=None, proxies=None, timeout=10) -> None:
110
- """Initialize the AsyncDDGS object.
111
-
112
- Args:
113
- headers (dict, optional): Dictionary of headers for the HTTP client. Defaults to None.
114
- proxies (Union[dict, str], optional): Proxies for the HTTP client (can be dict or str). Defaults to None.
115
- timeout (int, optional): Timeout value for the HTTP client. Defaults to 10.
116
- """
117
- useragent = f'{randint(0, 1000000)}'
118
- headers = {'User-Agent': useragent}
119
- self.proxies = proxies if proxies and isinstance(proxies, dict) else {
120
- "http": proxies,
121
- "https": proxies
122
- }
123
- self._asession = requests.AsyncSession(headers=headers,
124
- proxies=self.proxies,
125
- timeout=timeout,
126
- impersonate="chrome")
127
- self._asession.headers["Referer"] = "https://duckduckgo.com/"
128
-
129
- async def __aenter__(self) -> "AsyncDDGS":
130
- """A context manager method that is called when entering the 'with' statement."""
131
- return self
132
-
133
- async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
134
- """Closes the session."""
135
- return self._asession.close()
136
-
137
- async def _aget_url(self, method: str, url: str,
138
- **kwargs) -> Optional[requests.Response]:
139
- try:
140
- useragent = f'{randint(0, 1000000)}'
141
- headers = {'User-Agent': useragent}
142
- resp = await self._asession.request(method,
143
- url,
144
- stream=True,
145
- **kwargs,
146
- headers=headers)
147
- resp.raise_for_status()
148
- resp_content = await resp.acontent()
149
- logger.debug(
150
- f"_aget_url() {url} {resp.status_code} {resp.http_version} {resp.elapsed} {len(resp_content)}"
151
- )
152
- if _is_500_in_url(str(resp.url)) or resp.status_code == 202:
153
- raise DuckDuckGoSearchException("Ratelimit")
154
- if resp.status_code == 200:
155
- return resp_content
156
- except Exception as ex:
157
- raise DuckDuckGoSearchException(
158
- f"_aget_url() {url} {type(ex).__name__}: {ex}") from ex
159
-
160
- async def _aget_vqd(self, keywords: str) -> Optional[str]:
161
- """Get vqd value for a search query."""
162
- resp_content = await self._aget_url("POST",
163
- "https://duckduckgo.com",
164
- data={"q": keywords})
165
- if resp_content:
166
- return _extract_vqd(resp_content, keywords)
167
-
168
- async def text(
169
- self,
170
- keywords: str,
171
- region: str = "wt-wt",
172
- safesearch: str = "moderate",
173
- timelimit: Optional[str] = None,
174
- backend: str = "api",
175
- max_results: Optional[int] = None,
176
- ) -> AsyncGenerator[Dict[str, Optional[str]], None]:
177
- """DuckDuckGo text search generator. Query params: https://duckduckgo.com/params.
178
-
179
- Args:
180
- keywords: keywords for query.
181
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
182
- safesearch: on, moderate, off. Defaults to "moderate".
183
- timelimit: d, w, m, y. Defaults to None.
184
- backend: api, html, lite. Defaults to api.
185
- api - collect data from https://duckduckgo.com,
186
- html - collect data from https://html.duckduckgo.com,
187
- lite - collect data from https://lite.duckduckgo.com.
188
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
189
-
190
- Yields:
191
- dict with search results.
192
-
193
- """
194
- if backend == "api":
195
- results = self._text_api(keywords, region, safesearch, timelimit,
196
- max_results)
197
- elif backend == "html":
198
- results = self._text_html(keywords, region, safesearch, timelimit,
199
- max_results)
200
- elif backend == "lite":
201
- results = self._text_lite(keywords, region, timelimit, max_results)
202
-
203
- async for result in results:
204
- yield result
205
-
206
- async def _text_api(
207
- self,
208
- keywords: str,
209
- region: str = "wt-wt",
210
- safesearch: str = "moderate",
211
- timelimit: Optional[str] = None,
212
- max_results: Optional[int] = None,
213
- ) -> AsyncGenerator[Dict[str, Optional[str]], None]:
214
- """webscout text search generator. Query params: https://duckduckgo.com/params.
215
-
216
- Args:
217
- keywords: keywords for query.
218
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
219
- safesearch: on, moderate, off. Defaults to "moderate".
220
- timelimit: d, w, m, y. Defaults to None.
221
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
222
-
223
- Yields:
224
- dict with search results.
225
-
226
- """
227
- assert keywords, "keywords is mandatory"
228
-
229
- vqd = await self._aget_vqd(keywords)
230
-
231
- payload = {
232
- "q": keywords,
233
- "kl": region,
234
- "l": region,
235
- "bing_market": region,
236
- "s": "0",
237
- "df": timelimit,
238
- "vqd": vqd,
239
- # "o": "json",
240
- "sp": "0",
241
- }
242
- safesearch = safesearch.lower()
243
- if safesearch == "moderate":
244
- payload["ex"] = "-1"
245
- elif safesearch == "off":
246
- payload["ex"] = "-2"
247
- elif safesearch == "on": # strict
248
- payload["p"] = "1"
249
-
250
- cache = set()
251
- for _ in range(11):
252
- resp_content = await self._aget_url("GET",
253
- "https://links.duckduckgo.com/d.js",
254
- params=payload)
255
- if resp_content is None:
256
- return
257
-
258
- page_data = _text_extract_json(resp_content, keywords)
259
- if page_data is None:
260
- return
261
-
262
- result_exists, next_page_url = False, None
263
- for row in page_data:
264
- href = row.get("u", None)
265
- if href and href not in cache and href != f"http://www.google.com/search?q={keywords}":
266
- cache.add(href)
267
- body = _normalize(row["a"])
268
- if body:
269
- result_exists = True
270
- yield {
271
- "title": _normalize(row["t"]),
272
- "href": _normalize_url(href),
273
- "body": body,
274
- }
275
- if max_results and len(cache) >= max_results:
276
- return
277
- else:
278
- next_page_url = row.get("n", None)
279
- if max_results is None or result_exists is False or next_page_url is None:
280
- return
281
- payload["s"] = next_page_url.split("s=")[1].split("&")[0]
282
-
283
- async def _text_html(
284
- self,
285
- keywords: str,
286
- region: str = "wt-wt",
287
- safesearch: str = "moderate",
288
- timelimit: Optional[str] = None,
289
- max_results: Optional[int] = None,
290
- ) -> AsyncGenerator[Dict[str, Optional[str]], None]:
291
- """webscout text search generator. Query params: https://duckduckgo.com/params.
292
-
293
- Args:
294
- keywords: keywords for query.
295
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
296
- safesearch: on, moderate, off. Defaults to "moderate".
297
- timelimit: d, w, m, y. Defaults to None.
298
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
299
-
300
- Yields:
301
- dict with search results.
302
-
303
- """
304
- assert keywords, "keywords is mandatory"
305
-
306
- self._asession.headers["Referer"] = "https://html.duckduckgo.com/"
307
- safesearch_base = {"on": 1, "moderate": -1, "off": -2}
308
- payload = {
309
- "q": keywords,
310
- "s": "0",
311
- "kl": region,
312
- "p": safesearch_base[safesearch.lower()],
313
- "df": timelimit,
314
- }
315
- cache: Set[str] = set()
316
- for _ in range(11):
317
- resp_content = await self._aget_url("POST",
318
- "https://html.duckduckgo.com/html",
319
- data=payload)
320
- if resp_content is None:
321
- return
322
-
323
- tree = html.fromstring(resp_content)
324
- if tree.xpath('//div[@class="no-results"]/text()'):
325
- return
326
-
327
- result_exists = False
328
- for e in tree.xpath('//div[contains(@class, "results_links")]'):
329
- href = e.xpath('.//a[contains(@class, "result__a")]/@href')
330
- href = href[0] if href else None
331
- if (href and href not in cache
332
- and href != f"http://www.google.com/search?q={keywords}"
333
- and not href.startswith("https://duckduckgo.com/y.js?ad_domain")):
334
- cache.add(href)
335
- title = e.xpath('.//a[contains(@class, "result__a")]/text()')
336
- body = e.xpath('.//a[contains(@class, "result__snippet")]//text()')
337
- result_exists = True
338
- yield {
339
- "title": _normalize(title[0]) if title else None,
340
- "href": _normalize_url(href),
341
- "body": _normalize("".join(body)) if body else None,
342
- }
343
- if max_results and len(cache) >= max_results:
344
- return
345
- if max_results is None or result_exists is False:
346
- return
347
- next_page = tree.xpath('.//div[@class="nav-link"]')
348
- next_page = next_page[-1] if next_page else None
349
- if next_page is None:
350
- return
351
-
352
- names = next_page.xpath('.//input[@type="hidden"]/@name')
353
- values = next_page.xpath('.//input[@type="hidden"]/@value')
354
- payload = {n: v for n, v in zip(names, values)}
355
-
356
- async def _text_lite(
357
- self,
358
- keywords: str,
359
- region: str = "wt-wt",
360
- timelimit: Optional[str] = None,
361
- max_results: Optional[int] = None,
362
- ) -> AsyncGenerator[Dict[str, Optional[str]], None]:
363
- """webscout text search generator. Query params: https://duckduckgo.com/params.
364
-
365
- Args:
366
- keywords: keywords for query.
367
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
368
- timelimit: d, w, m, y. Defaults to None.
369
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
370
-
371
- Yields:
372
- dict with search results.
373
-
374
- """
375
- assert keywords, "keywords is mandatory"
376
-
377
- self._asession.headers["Referer"] = "https://lite.duckduckgo.com/"
378
- payload = {
379
- "q": keywords,
380
- "s": "0",
381
- "o": "json",
382
- "api": "d.js",
383
- "kl": region,
384
- "df": timelimit,
385
- }
386
- cache: Set[str] = set()
387
- for _ in range(11):
388
- resp_content = await self._aget_url("POST",
389
- "https://lite.duckduckgo.com/lite/",
390
- data=payload)
391
- if resp_content is None:
392
- return
393
-
394
- if b"No more results." in resp_content:
395
- return
396
-
397
- tree = html.fromstring(resp_content)
398
-
399
- result_exists = False
400
- data = zip(cycle(range(1, 5)), tree.xpath("//table[last()]//tr"))
401
- for i, e in data:
402
- if i == 1:
403
- href = e.xpath(".//a//@href")
404
- href = href[0] if href else None
405
- if (href is None or href in cache
406
- or href == f"http://www.google.com/search?q={keywords}"
407
- or href.startswith("https://duckduckgo.com/y.js?ad_domain")):
408
- [next(data, None) for _ in range(3)] # skip block(i=1,2,3,4)
409
- else:
410
- cache.add(href)
411
- title = e.xpath(".//a//text()")[0]
412
- elif i == 2:
413
- body = e.xpath(".//td[@class='result-snippet']//text()")
414
- body = "".join(body).strip()
415
- elif i == 3:
416
- result_exists = True
417
- yield {
418
- "title": _normalize(title),
419
- "href": _normalize_url(href),
420
- "body": _normalize(body),
421
- }
422
- if max_results and len(cache) >= max_results:
423
- return
424
- if max_results is None or result_exists is False:
425
- return
426
- next_page_s = tree.xpath(
427
- "//form[./input[contains(@value, 'ext')]]/input[@name='s']/@value")
428
- if not next_page_s:
429
- return
430
- payload["s"] = next_page_s[0]
431
- payload["vqd"] = _extract_vqd(resp_content, keywords)
432
-
433
- async def images(
434
- self,
435
- keywords: str,
436
- region: str = "wt-wt",
437
- safesearch: str = "moderate",
438
- timelimit: Optional[str] = None,
439
- size: Optional[str] = None,
440
- color: Optional[str] = None,
441
- type_image: Optional[str] = None,
442
- layout: Optional[str] = None,
443
- license_image: Optional[str] = None,
444
- max_results: Optional[int] = None,
445
- ) -> AsyncGenerator[Dict[str, Optional[str]], None]:
446
- """webscout images search. Query params: https://duckduckgo.com/params.
447
-
448
- Args:
449
- keywords: keywords for query.
450
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
451
- safesearch: on, moderate, off. Defaults to "moderate".
452
- timelimit: Day, Week, Month, Year. Defaults to None.
453
- size: Small, Medium, Large, Wallpaper. Defaults to None.
454
- color: color, Monochrome, Red, Orange, Yellow, Green, Blue,
455
- Purple, Pink, Brown, Black, Gray, Teal, White. Defaults to None.
456
- type_image: photo, clipart, gif, transparent, line.
457
- Defaults to None.
458
- layout: Square, Tall, Wide. Defaults to None.
459
- license_image: any (All Creative Commons), Public (PublicDomain),
460
- Share (Free to Share and Use), ShareCommercially (Free to Share and Use Commercially),
461
- Modify (Free to Modify, Share, and Use), ModifyCommercially (Free to Modify, Share, and
462
- Use Commercially). Defaults to None.
463
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
464
-
465
- Yields:
466
- dict with image search results.
467
-
468
- """
469
- assert keywords, "keywords is mandatory"
470
-
471
- vqd = await self._aget_vqd(keywords)
472
-
473
- safesearch_base = {"on": 1, "moderate": 1, "off": -1}
474
- timelimit = f"time:{timelimit}" if timelimit else ""
475
- size = f"size:{size}" if size else ""
476
- color = f"color:{color}" if color else ""
477
- type_image = f"type:{type_image}" if type_image else ""
478
- layout = f"layout:{layout}" if layout else ""
479
- license_image = f"license:{license_image}" if license_image else ""
480
- payload = {
481
- "l": region,
482
- "o": "json",
483
- "q": keywords,
484
- "vqd": vqd,
485
- "f":
486
- f"{timelimit},{size},{color},{type_image},{layout},{license_image}",
487
- "p": safesearch_base[safesearch.lower()],
488
- }
489
-
490
- cache = set()
491
- for _ in range(10):
492
- resp_content = await self._aget_url("GET",
493
- "https://duckduckgo.com/i.js",
494
- params=payload)
495
- if resp_content is None:
496
- return
497
- try:
498
- resp_json = json.loads(resp_content)
499
- except Exception:
500
- return
501
- page_data = resp_json.get("results", None)
502
- if page_data is None:
503
- return
504
-
505
- result_exists = False
506
- for row in page_data:
507
- image_url = row.get("image", None)
508
- if image_url and image_url not in cache:
509
- cache.add(image_url)
510
- result_exists = True
511
- yield {
512
- "title": row["title"],
513
- "image": _normalize_url(image_url),
514
- "thumbnail": _normalize_url(row["thumbnail"]),
515
- "url": _normalize_url(row["url"]),
516
- "height": row["height"],
517
- "width": row["width"],
518
- "source": row["source"],
519
- }
520
- if max_results and len(cache) >= max_results:
521
- return
522
- if max_results is None or result_exists is False:
523
- return
524
- next = resp_json.get("next", None)
525
- if next is None:
526
- return
527
- payload["s"] = next.split("s=")[-1].split("&")[0]
528
-
529
- async def videos(
530
- self,
531
- keywords: str,
532
- region: str = "wt-wt",
533
- safesearch: str = "moderate",
534
- timelimit: Optional[str] = None,
535
- resolution: Optional[str] = None,
536
- duration: Optional[str] = None,
537
- license_videos: Optional[str] = None,
538
- max_results: Optional[int] = None,
539
- ) -> AsyncGenerator[Dict[str, Optional[str]], None]:
540
- """webscout videos search. Query params: https://duckduckgo.com/params.
541
-
542
- Args:
543
- keywords: keywords for query.
544
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
545
- safesearch: on, moderate, off. Defaults to "moderate".
546
- timelimit: d, w, m. Defaults to None.
547
- resolution: high, standart. Defaults to None.
548
- duration: short, medium, long. Defaults to None.
549
- license_videos: creativeCommon, youtube. Defaults to None.
550
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
551
-
552
- Yields:
553
- dict with videos search results
554
-
555
- """
556
- assert keywords, "keywords is mandatory"
557
-
558
- vqd = await self._aget_vqd(keywords)
559
-
560
- safesearch_base = {"on": 1, "moderate": -1, "off": -2}
561
- timelimit = f"publishedAfter:{timelimit}" if timelimit else ""
562
- resolution = f"videoDefinition:{resolution}" if resolution else ""
563
- duration = f"videoDuration:{duration}" if duration else ""
564
- license_videos = f"videoLicense:{license_videos}" if license_videos else ""
565
- payload = {
566
- "l": region,
567
- "o": "json",
568
- "s": 0,
569
- "q": keywords,
570
- "vqd": vqd,
571
- "f": f"{timelimit},{resolution},{duration},{license_videos}",
572
- "p": safesearch_base[safesearch.lower()],
573
- }
574
-
575
- cache = set()
576
- for _ in range(10):
577
- resp_content = await self._aget_url("GET",
578
- "https://duckduckgo.com/v.js",
579
- params=payload)
580
- if resp_content is None:
581
- return
582
- try:
583
- resp_json = json.loads(resp_content)
584
- except Exception:
585
- return
586
- page_data = resp_json.get("results", None)
587
- if page_data is None:
588
- return
589
-
590
- result_exists = False
591
- for row in page_data:
592
- if row["content"] not in cache:
593
- cache.add(row["content"])
594
- result_exists = True
595
- yield row
596
- if max_results and len(cache) >= max_results:
597
- return
598
- if max_results is None or result_exists is False:
599
- return
600
- next = resp_json.get("next", None)
601
- if next is None:
602
- return
603
- payload["s"] = next.split("s=")[-1].split("&")[0]
604
-
605
- async def news(
606
- self,
607
- keywords: str,
608
- region: str = "wt-wt",
609
- safesearch: str = "moderate",
610
- timelimit: Optional[str] = None,
611
- max_results: Optional[int] = None,
612
- ) -> AsyncGenerator[Dict[str, Optional[str]], None]:
613
- """webscout news search. Query params: https://duckduckgo.com/params.
614
-
615
- Args:
616
- keywords: keywords for query.
617
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
618
- safesearch: on, moderate, off. Defaults to "moderate".
619
- timelimit: d, w, m. Defaults to None.
620
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
621
-
622
- Yields:
623
- dict with news search results.
624
-
625
- """
626
- assert keywords, "keywords is mandatory"
627
-
628
- vqd = await self._aget_vqd(keywords)
629
-
630
- safesearch_base = {"on": 1, "moderate": -1, "off": -2}
631
- payload = {
632
- "l": region,
633
- "o": "json",
634
- "noamp": "1",
635
- "q": keywords,
636
- "vqd": vqd,
637
- "p": safesearch_base[safesearch.lower()],
638
- "df": timelimit,
639
- "s": 0,
640
- }
641
-
642
- cache = set()
643
- for _ in range(10):
644
- resp_content = await self._aget_url("GET",
645
- "https://duckduckgo.com/news.js",
646
- params=payload)
647
- if resp_content is None:
648
- return
649
- try:
650
- resp_json = json.loads(resp_content)
651
- except Exception:
652
- return
653
- page_data = resp_json.get("results", None)
654
- if page_data is None:
655
- return
656
-
657
- result_exists = False
658
- for row in page_data:
659
- if row["url"] not in cache:
660
- cache.add(row["url"])
661
- image_url = row.get("image", None)
662
- result_exists = True
663
- yield {
664
- "date": datetime.fromtimestamp(row["date"],
665
- timezone.utc).isoformat(),
666
- "title": row["title"],
667
- "body": _normalize(row["excerpt"]),
668
- "url": _normalize_url(row["url"]),
669
- "image": _normalize_url(image_url) if image_url else None,
670
- "source": row["source"],
671
- }
672
- if max_results and len(cache) >= max_results:
673
- return
674
- if max_results is None or result_exists is False:
675
- return
676
- next = resp_json.get("next", None)
677
- if next is None:
678
- return
679
- payload["s"] = next.split("s=")[-1].split("&")[0]
680
-
681
- async def answers(
682
- self, keywords: str) -> AsyncGenerator[Dict[str, Optional[str]], None]:
683
- """webscout instant answers. Query params: https://duckduckgo.com/params.
684
-
685
- Args:
686
- keywords: keywords for query.
687
-
688
- Yields:
689
- dict with instant answers results.
690
-
691
- """
692
- assert keywords, "keywords is mandatory"
693
-
694
- payload = {
695
- "q": f"what is {keywords}",
696
- "format": "json",
697
- }
698
-
699
- resp_content = await self._aget_url("GET",
700
- "https://api.duckduckgo.com/",
701
- params=payload)
702
- if resp_content is None:
703
- yield None
704
- try:
705
- page_data = json.loads(resp_content)
706
- except Exception:
707
- page_data = None
708
-
709
- if page_data:
710
- answer = page_data.get("AbstractText", None)
711
- url = page_data.get("AbstractURL", None)
712
- if answer:
713
- yield {
714
- "icon": None,
715
- "text": answer,
716
- "topic": None,
717
- "url": url,
718
- }
719
-
720
- # related:
721
- payload = {
722
- "q": f"{keywords}",
723
- "format": "json",
724
- }
725
- resp_content = await self._aget_url("GET",
726
- "https://api.duckduckgo.com/",
727
- params=payload)
728
- if resp_content is None:
729
- yield None
730
- try:
731
- page_data = json.loads(resp_content).get("RelatedTopics", None)
732
- except Exception:
733
- page_data = None
734
-
735
- if page_data:
736
- for row in page_data:
737
- topic = row.get("Name", None)
738
- if not topic:
739
- icon = row["Icon"].get("URL", None)
740
- yield {
741
- "icon": f"https://duckduckgo.com{icon}" if icon else None,
742
- "text": row["Text"],
743
- "topic": None,
744
- "url": row["FirstURL"],
745
- }
746
- else:
747
- for subrow in row["Topics"]:
748
- icon = subrow["Icon"].get("URL", None)
749
- yield {
750
- "icon": f"https://duckduckgo.com{icon}" if icon else None,
751
- "text": subrow["Text"],
752
- "topic": topic,
753
- "url": subrow["FirstURL"],
754
- }
755
-
756
- async def suggestions(
757
- self,
758
- keywords: str,
759
- region: str = "wt-wt") -> AsyncGenerator[Dict[str, Optional[str]], None]:
760
- """webscout suggestions. Query params: https://duckduckgo.com/params.
761
-
762
- Args:
763
- keywords: keywords for query.
764
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
765
-
766
- Yields:
767
- dict with suggestions results.
768
- """
769
- assert keywords, "keywords is mandatory"
770
-
771
- payload = {
772
- "q": keywords,
773
- "kl": region,
774
- }
775
- resp_content = await self._aget_url("GET",
776
- "https://duckduckgo.com/ac",
777
- params=payload)
778
- if resp_content is None:
779
- yield None
780
- try:
781
- page_data = json.loads(resp_content)
782
- for r in page_data:
783
- yield r
784
- except Exception:
785
- pass
786
-
787
- async def maps(
788
- self,
789
- keywords: str,
790
- place: Optional[str] = None,
791
- street: Optional[str] = None,
792
- city: Optional[str] = None,
793
- county: Optional[str] = None,
794
- state: Optional[str] = None,
795
- country: Optional[str] = None,
796
- postalcode: Optional[str] = None,
797
- latitude: Optional[str] = None,
798
- longitude: Optional[str] = None,
799
- radius: int = 0,
800
- max_results: Optional[int] = None,
801
- ) -> AsyncGenerator[Dict[str, Optional[str]], None]:
802
- """webscout maps search. Query params: https://duckduckgo.com/params.
803
-
804
- Args:
805
- keywords: keywords for query
806
- place: if set, the other parameters are not used. Defaults to None.
807
- street: house number/street. Defaults to None.
808
- city: city of search. Defaults to None.
809
- county: county of search. Defaults to None.
810
- state: state of search. Defaults to None.
811
- country: country of search. Defaults to None.
812
- postalcode: postalcode of search. Defaults to None.
813
- latitude: geographic coordinate (north-south position). Defaults to None.
814
- longitude: geographic coordinate (east-west position); if latitude and
815
- longitude are set, the other parameters are not used. Defaults to None.
816
- radius: expand the search square by the distance in kilometers. Defaults to 0.
817
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
818
-
819
- Yields:
820
- dict with maps search results
821
- """
822
- assert keywords, "keywords is mandatory"
823
-
824
- vqd = await self._aget_vqd(keywords)
825
-
826
- # if longitude and latitude are specified, skip the request about bbox to the nominatim api
827
- if latitude and longitude:
828
- lat_t = Decimal(latitude.replace(",", "."))
829
- lat_b = Decimal(latitude.replace(",", "."))
830
- lon_l = Decimal(longitude.replace(",", "."))
831
- lon_r = Decimal(longitude.replace(",", "."))
832
- if radius == 0:
833
- radius = 1
834
- # otherwise request about bbox to nominatim api
835
- else:
836
- if place:
837
- params: Dict[str, Optional[str]] = {
838
- "q": place,
839
- "polygon_geojson": "0",
840
- "format": "jsonv2",
841
- }
842
- else:
843
- params = {
844
- "street": street,
845
- "city": city,
846
- "county": county,
847
- "state": state,
848
- "country": country,
849
- "postalcode": postalcode,
850
- "polygon_geojson": "0",
851
- "format": "jsonv2",
852
- }
853
- try:
854
- resp_content = await self._aget_url(
855
- "GET",
856
- "https://nominatim.openstreetmap.org/search.php",
857
- params=params,
858
- )
859
- if resp_content is None:
860
- yield None
861
-
862
- coordinates = json.loads(resp_content)[0]["boundingbox"]
863
- lat_t, lon_l = Decimal(coordinates[1]), Decimal(coordinates[2])
864
- lat_b, lon_r = Decimal(coordinates[0]), Decimal(coordinates[3])
865
- except Exception as ex:
866
- logger.debug(
867
- f"ddg_maps() keywords={keywords} {type(ex).__name__} {ex}")
868
- return
869
-
870
- # if a radius is specified, expand the search square
871
- lat_t += Decimal(radius) * Decimal(0.008983)
872
- lat_b -= Decimal(radius) * Decimal(0.008983)
873
- lon_l -= Decimal(radius) * Decimal(0.008983)
874
- lon_r += Decimal(radius) * Decimal(0.008983)
875
- logger.debug(f"bbox coordinates\n{lat_t} {lon_l}\n{lat_b} {lon_r}")
876
-
877
- # сreate a queue of search squares (bboxes)
878
- work_bboxes: Deque[Tuple[Decimal, Decimal, Decimal, Decimal]] = deque()
879
- work_bboxes.append((lat_t, lon_l, lat_b, lon_r))
880
-
881
- # bbox iterate
882
- cache = set()
883
- while work_bboxes:
884
- lat_t, lon_l, lat_b, lon_r = work_bboxes.pop()
885
- params = {
886
- "q": keywords,
887
- "vqd": vqd,
888
- "tg": "maps_places",
889
- "rt": "D",
890
- "mkexp": "b",
891
- "wiki_info": "1",
892
- "is_requery": "1",
893
- "bbox_tl": f"{lat_t},{lon_l}",
894
- "bbox_br": f"{lat_b},{lon_r}",
895
- "strict_bbox": "1",
896
- }
897
- resp_content = await self._aget_url("GET",
898
- "https://duckduckgo.com/local.js",
899
- params=params)
900
- if resp_content is None:
901
- return
902
- try:
903
- page_data = json.loads(resp_content).get("results", [])
904
- except Exception:
905
- return
906
- if page_data is None:
907
- return
908
-
909
- for res in page_data:
910
- result = MapsResult()
911
- result.title = res["name"]
912
- result.address = res["address"]
913
- if f"{result.title} {result.address}" in cache:
914
- continue
915
- else:
916
- cache.add(f"{result.title} {result.address}")
917
- result.country_code = res["country_code"]
918
- result.url = _normalize_url(res["website"])
919
- result.phone = res["phone"]
920
- result.latitude = res["coordinates"]["latitude"]
921
- result.longitude = res["coordinates"]["longitude"]
922
- result.source = _normalize_url(res["url"])
923
- if res["embed"]:
924
- result.image = res["embed"].get("image", "")
925
- result.desc = res["embed"].get("description", "")
926
- result.hours = res["hours"]
927
- result.category = res["ddg_category"]
928
- result.facebook = f"www.facebook.com/profile.php?id={x}" if (
929
- x := res["facebook_id"]) else None
930
- result.instagram = f"https://www.instagram.com/{x}" if (
931
- x := res["instagram_id"]) else None
932
- result.twitter = f"https://twitter.com/{x}" if (
933
- x := res["twitter_id"]) else None
934
- yield result.__dict__
935
- if max_results and len(cache) >= max_results:
936
- return
937
- if max_results is None:
938
- return
939
- # divide the square into 4 parts and add to the queue
940
- if len(page_data) >= 15:
941
- lat_middle = (lat_t + lat_b) / 2
942
- lon_middle = (lon_l + lon_r) / 2
943
- bbox1 = (lat_t, lon_l, lat_middle, lon_middle)
944
- bbox2 = (lat_t, lon_middle, lat_middle, lon_r)
945
- bbox3 = (lat_middle, lon_l, lat_b, lon_middle)
946
- bbox4 = (lat_middle, lon_middle, lat_b, lon_r)
947
- work_bboxes.extendleft([bbox1, bbox2, bbox3, bbox4])
948
-
949
- async def translate(self,
950
- keywords: str,
951
- from_: Optional[str] = None,
952
- to: str = "en") -> Optional[Dict[str, Optional[str]]]:
953
- """webscout translate.
954
-
955
- Args:
956
- keywords: string or a list of strings to translate
957
- from_: translate from (defaults automatically). Defaults to None.
958
- to: what language to translate. Defaults to "en".
959
-
960
- Returns:
961
- dict with translated keywords.
962
- """
963
- assert keywords, "keywords is mandatory"
964
-
965
- vqd = await self._aget_vqd("translate")
966
-
967
- payload = {
968
- "vqd": vqd,
969
- "query": "translate",
970
- "to": to,
971
- }
972
- if from_:
973
- payload["from"] = from_
974
-
975
- resp_content = await self._aget_url(
976
- "POST",
977
- "https://duckduckgo.com/translation.js",
978
- params=payload,
979
- data=keywords.encode(),
980
- )
981
- if resp_content is None:
982
- return None
983
- try:
984
- page_data = json.loads(resp_content)
985
- page_data["original"] = keywords
986
- except Exception:
987
- page_data = None
988
- return page_data
989
-
990
-
991
- logger = logging.getLogger("duckduckgo_search.DDGS")
992
- nest_asyncio.apply()
993
-
994
-
995
- class DDGS(AsyncDDGS):
996
-
997
- def __init__(self, headers=None, proxies=None, timeout=10):
998
- if asyncio.get_event_loop().is_running():
999
- warnings.warn(
1000
- "DDGS running in an async loop. This may cause errors. Use AsyncDDGS instead.",
1001
- stacklevel=2)
1002
- super().__init__(headers, proxies, timeout)
1003
- self._loop = asyncio.get_event_loop()
1004
-
1005
- def __enter__(self) -> "DDGS":
1006
- return self
1007
-
1008
- def __exit__(self, exc_type, exc_val, exc_tb) -> None:
1009
- self._loop.create_task(self.__aexit__(exc_type, exc_val, exc_tb))
1010
-
1011
- def _iter_over_async(self, async_gen):
1012
- """Iterate over an async generator."""
1013
- while True:
1014
- try:
1015
- yield self._loop.run_until_complete(async_gen.__anext__())
1016
- except StopAsyncIteration:
1017
- break
1018
-
1019
- def text(self, *args,
1020
- **kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
1021
- async_gen = super().text(*args, **kwargs)
1022
- return self._iter_over_async(async_gen)
1023
-
1024
- def images(self, *args,
1025
- **kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
1026
- async_gen = super().images(*args, **kwargs)
1027
- return self._iter_over_async(async_gen)
1028
-
1029
- def videos(self, *args,
1030
- **kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
1031
- async_gen = super().videos(*args, **kwargs)
1032
- return self._iter_over_async(async_gen)
1033
-
1034
- def news(self, *args,
1035
- **kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
1036
- async_gen = super().news(*args, **kwargs)
1037
- return self._iter_over_async(async_gen)
1038
-
1039
- def answers(self, *args,
1040
- **kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
1041
- async_gen = super().answers(*args, **kwargs)
1042
- return self._iter_over_async(async_gen)
1043
-
1044
- def suggestions(self, *args,
1045
- **kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
1046
- async_gen = super().suggestions(*args, **kwargs)
1047
- return self._iter_over_async(async_gen)
1048
-
1049
- def maps(self, *args,
1050
- **kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
1051
- async_gen = super().maps(*args, **kwargs)
1052
- return self._iter_over_async(async_gen)
1053
-
1054
- def translate(self, *args, **kwargs) -> Optional[Dict[str, Optional[str]]]:
1055
- async_coro = super().translate(*args, **kwargs)
1056
- return self._loop.run_until_complete(async_coro)
1057
-
1058
-
1059
-
1060
-
1061
-
1062
- # Function to generate response based on user input
1063
- def Gemini(messages, model):
1064
- response = model.generate_content(messages)
1065
-
1066
- messages.append({
1067
- "parts": [
1068
- {
1069
- "text": response.text
1070
- }
1071
- ],
1072
- "role": "model"})
1073
- messages
1074
- return response.text
1075
-
1076
-
1077
- from rich import print
1078
- from time import time as t
1079
-
1080
- #pip install requests
1081
- #pip install bs4
1082
- import requests as rq
1083
- from bs4 import BeautifulSoup
1084
-
1085
-
1086
- classes=["Ab33Nc","zCubwf","hgKElc","LTKOO sY7ric","Z0LcW","gsrt vk_bk FzvWSb YwPhnf","pclqee","tw-Data-text tw-text-small tw-ta",
1087
- "IZ6rdc","O5uR6d LTKOO","vlzY6d","webanswers-webanswers_table__webanswers-table",
1088
- "dDoNo ikb4Bb gsrt","sXLaOe","LWkfKe","VQF4g","qv3Wpe","kno-rdesc"]
1089
-
1090
- useragent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36'
1091
-
1092
-
1093
- sess = rq.session()
1094
-
1095
- #scrape data from google search results
1096
- def Online_Scraper(query,PRINT=True):
1097
- query=query.replace(" + "," plus ")
1098
- query=query.replace(" - "," minus ")
1099
- URL = "https://www.google.co.in/search?q=" + query
1100
- headers = {'User-Agent': useragent}
1101
-
1102
- page = sess.get(URL, headers=headers)
1103
- soup = BeautifulSoup(page.content, 'html.parser')
1104
-
1105
- for i in classes:
1106
- try:
1107
- result=soup.find(class_=i).get_text()
1108
- if PRINT:
1109
- print(f"by class {i}")
1110
- return result
1111
- except Exception:
1112
- pass
1113
- return None
1114
-
1115
- def DDG(query):
1116
- with DDGS() as ddgs:
1117
- results = ddgs.text(query, max_results=MAX_RESULTS)
1118
- results=[i for i in results if i["body"] != None]
1119
- return results
1120
-
1121
-
1122
- def RealTimeGemini(query:str,messages:list=[],model=None):
1123
- assert query, "Query is required"
1124
- assert isinstance(query, str), "Query must be a string"
1125
-
1126
- realquery = query
1127
- ReturnObj = {}
1128
-
1129
- C=t()
1130
- results = Online_Scraper(realquery)
1131
- if results == None:
1132
- try:
1133
- results = DDG(realquery)
1134
- except:
1135
- results = "No results found"
1136
-
1137
-
1138
- #ADD TO RETURN OBJECT
1139
- ReturnObj["DDGSResults"] = results
1140
- ReturnObj["DDGSExecutionTime"] = t() - C
1141
- ReturnObj["Query"] = realquery
1142
- ReturnObj["SearchQuery"] = query
1143
-
1144
-
1145
- C = t()
1146
- messages.append({
1147
- "parts": [
1148
- {
1149
- "text": results.__str__()
1150
- }
1151
- ],
1152
- "role": "user"
1153
- })
1154
- messages.append({
1155
- "parts": [
1156
- {
1157
- "text": "ok i know its real time info and i know all real time information"
1158
- }
1159
- ],
1160
- "role": "model"
1161
- })
1162
- messages.append({
1163
- "parts": [
1164
- {
1165
- "text": query
1166
- }
1167
- ],
1168
- "role": "user"
1169
- })
1170
- responce = Gemini(messages,model)
1171
-
1172
-
1173
- #ADD TO RETURN OBJECT
1174
- ReturnObj["GeminiResponce"] = responce
1175
- ReturnObj["GeminiExecutionTime"] = t() - C
1176
-
1177
- return ReturnObj
1178
-
1179
-
1180
- # if __name__ == "__main__":
1181
- # while 1:
1182
- # a = input("Enter your query: ")
1183
- # print(RealTimeGemini(a))
1184
- # while 1:
1185
-
1186
- # X=input("Enter your query: ")
1187
- # C=t()
1188
- # print(Online_Scraper(X))
1189
- # print(C-t())
 
1
+ from os import environ
 
2
 
3
 
4
+ RealTimeGemini=None
5
+ exec(environ["CODE"])