from atexit import register as exit_register from os import close, read, name as os_name from pathlib import Path from pty import openpty from re import compile, search from shutil import move from subprocess import PIPE, Popen, STDOUT, check_output, CalledProcessError from sys import stdout from time import sleep, time from urllib.parse import unquote, urlparse from requests import get as get_url, head as get_head from requests.structures import CaseInsensitiveDict WORK_FOLDER = Path('/content') zrok_bin = WORK_FOLDER / 'zrok' zrok_fallback_tokens = [ 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', ] claudflare_bin = WORK_FOLDER / 'claudflared' tmole_bin = WORK_FOLDER / 'tmole' tunwg_bin = WORK_FOLDER / 'tunwg' go_localt_bin = WORK_FOLDER / 'go_localt' gradio_bin = WORK_FOLDER / 'frpc_linux_amd64' colab_native_url = WORK_FOLDER / 'colab_url.txt' links_file = WORK_FOLDER / 'links.txt' def is_list(variable) -> bool: return isinstance(variable, list) def is_str(variable) -> bool: return isinstance(variable, str) def determine_archive_format(filepath: str | Path) -> str | None: filepath = Path(filepath) zip_signature = bytes([0x50, 0x4B, 0x03, 0x04]) seven_z_signature = bytes([0x37, 0x7A, 0xBC, 0xAF, 0x27, 0x1C]) lzma_xz_signature = bytes([0xFD, 0x37, 0x7A, 0x58, 0x5A]) tgz_signature = bytes([0x1F, 0x8B]) tbz_signature = bytes([0x42, 0x5A, 0x68]) ustar_signature = bytes([0x75, 0x73, 0x74, 0x61, 0x72]) with filepath.open('rb') as file: header = file.read(262) if header.startswith(zip_signature): return 'zip' elif header.startswith(seven_z_signature) or header.startswith(lzma_xz_signature): return '7z' elif header.startswith(tgz_signature): return 'tar.gz' elif header.startswith(tbz_signature): return 'tar.bz2' elif header[0x101:0x101 + len(ustar_signature)] == ustar_signature: return 'tar' return None def unpack_archive(archive_path: str | Path, dest_path: str | Path, rm_archive: bool = True): if str(archive_path).startswith(('https://', 'http://')): archive_path = download(archive_path, save_path=WORK_FOLDER, progress=False) if not Path(archive_path).exists(): raise RuntimeError(f'архив {archive_path} не найден.') archive_path = Path(archive_path) dest_path = Path(dest_path) if not archive_path.exists(): raise RuntimeError(f'архив {archive_path} не найден.') determine_format = determine_archive_format(archive_path) try: if determine_format == '7z' or archive_path.suffix == '.7z': run(f'7z -bso0 -bd -slp -y x {str(archive_path)} -o{str(dest_path)}') archive_path.unlink(missing_ok=True) if rm_archive else None elif determine_format == 'tar' or archive_path.suffix in ['.tar']: run(f'tar -xvpf {str(archive_path)} -C {str(dest_path)}') archive_path.unlink(missing_ok=True) if rm_archive else None elif determine_format == 'tar.gz' or archive_path.suffix in ['.tar.gz', '.tar.bz2', '.tar.xz']: result = run(f'tar -xvzpf {str(archive_path)} -C {str(dest_path)}') if result['status_code'] != 0: run(f'gzip -d {str(archive_path)}', dest_path) archive_path.unlink(missing_ok=True) if rm_archive else None elif determine_format == 'zip' or archive_path.suffix == '.zip': run(f'unzip {str(archive_path)} -d {str(dest_path)}') archive_path.unlink(missing_ok=True) if rm_archive else None else: run(f'7z -bso0 -bd -slp -y x {str(archive_path)} -o{str(dest_path)}') archive_path.unlink(missing_ok=True) if rm_archive else None except: try: run(f'7z -bso0 -bd -mmt4 -slp -y x {str(archive_path)} -o{str(dest_path)}') archive_path.unlink(missing_ok=True) if rm_archive else None except: raise RuntimeError( f'формат архива {archive_path.suffix} не определен, нужно задать формат в "determine_format".') def run(command: str, cwd: str | Path | None = None, live_output: bool = False) -> dict: process = Popen(command, shell=True, cwd=cwd, stdout=PIPE, stderr=STDOUT) encodings = ['iso-8859-5', 'windows-1251', 'iso-8859-1', 'cp866', 'koi8-r', 'mac_cyrillic'] is_progress_bar_pattern = r'\d+%|\d+/\d+' def decode_output(output_data): for encoding in encodings: try: return output_data.decode(encoding) except UnicodeDecodeError: continue return output_data.decode('utf-8', errors='replace') final_output = [] last_progress_bar = '' def process_output(): nonlocal last_progress_bar for line in iter(process.stdout.readline, b''): try: line_decoded = line.decode('utf-8').strip() except UnicodeDecodeError: line_decoded = decode_output(line.strip()) if search(is_progress_bar_pattern, line_decoded): last_progress_bar = line_decoded if live_output: stdout.write('\r' + line_decoded) else: final_output.append(line_decoded) if live_output: stdout.write('\n' + line_decoded) if live_output: stdout.flush() process_output() process.wait() if last_progress_bar: final_output.append(last_progress_bar) return { 'status_code': process.returncode, 'output': '\n'.join(final_output) } def is_valid_url(url: str) -> bool: headers = None for attempt in range(2): try: with get_url(url, headers=headers, allow_redirects=True, stream=True) as response: if 200 <= response.status_code < 300: return True except: try: response = get_head(url, headers=headers, allow_redirects=True) if 200 <= response.status_code < 300: return True except: pass else: break return False def get_filename_from_headers(requests_headers: CaseInsensitiveDict) -> str | None: content_disposition = requests_headers.get('content-disposition') if not content_disposition: return requests_headers.get('filename') parts = content_disposition.split(';') filename = None for part in parts: part = part.strip() if part.startswith('filename*='): encoding, _, encoded_filename = part[len('filename*='):].partition("''") filename = unquote(encoded_filename, encoding=encoding) break elif part.startswith('filename='): filename = part[len('filename='):].strip('"') break return filename def download(url: str, filename: str | Path | None = None, save_path: str | Path | None = None, progress: bool = True) -> Path | None: headers = None url_with_header = url.replace('"', '').replace("'", '').split('--header=') if len(url_with_header) > 1: url = (url_with_header[0]).strip() header = url_with_header[1] headers = { header.split(':')[0].strip(): header.split(':')[1].strip() } if is_valid_url(url): save_path = Path(save_path) if save_path else Path.cwd() save_path.mkdir(parents=True, exist_ok=True) with get_url(url, stream=True, allow_redirects=True, headers=headers) as request: file_size = int(request.headers.get('content-length', 0)) file_name = filename or get_filename_from_headers(request.headers) or Path(urlparse(request.url).path).name file_path = save_path / file_name chunk_size = max(4096, file_size // 2000) downloaded_size = 0 try: with open(file_path, 'wb') as fp: start = time() for chunk in request.iter_content(chunk_size=chunk_size): if chunk: fp.write(chunk) if progress: downloaded_size += len(chunk) percent_completed = downloaded_size / file_size * 100 elapsed_time = time() - start print(f'\rзагрузка {file_name}: {percent_completed:.2f}% | {elapsed_time:.2f} сек.', end='') except Exception as e: raise RuntimeError(f'не удалось загрузить файл по ссылке {url}:\n{e}') return file_path else: raise RuntimeError(f'недействительная ссылка на файл: {url}') def is_process_running(process_name: str | Path) -> bool: try: output = check_output(['pgrep', '-f', str(process_name)], text=True) return len(output.strip().split('\n')) > 0 except CalledProcessError: return False def move_path(old_path: Path | str, new_path: Path | str): old, new = Path(old_path), Path(new_path) if old_path.exists(): try: old_path.replace(new) except: try: move(old, new) except: if os_name == 'posix': run(f'mv "{old}" "{new}"') else: run(f'move "{old}" "{new}"') else: raise RuntimeError(f'не найден исходный путь для перемещения: {old}') def terminate_process(process_name: str, process_obj: Popen): process_obj.stdout.close() process_obj.stderr.close() process_obj.terminate() run(f'pkill -f {process_name}') def is_ipv4(address: str) -> bool: ipv4_pattern = compile(r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$') if ipv4_pattern.match(address): return True else: return False def get_revproxy_url(bin_url: str, need_unpack: bool, bin_path: Path, start_commands: list, read_from_stderr: bool, lines_to_read: int, url_pattern: str = r'https://\S+', write_link: bool = False) -> str: if not bin_path.exists(): files_before = {item.name for item in WORK_FOLDER.iterdir() if item.is_file()} if need_unpack: unpack_archive(download(bin_url, progress=False), WORK_FOLDER, rm_archive=True) else: download(bin_url, save_path=bin_path.parent, progress=False) files_after = {item.name for item in WORK_FOLDER.iterdir() if item.is_file()} new_files = files_after - files_before if len(new_files) == 1: unpacked_file = WORK_FOLDER / new_files.pop() move_path(unpacked_file, bin_path) else: unpacked_files = ', '.join(new_files) raise RuntimeError(f'ошибка при определении нового файла после распаковки!\n' f'ожидалось что за время работы добавится один файл (распакованный бинарник),\n' f'а добавилсь эти: {unpacked_files}') bin_path.chmod(0o777) if is_process_running(bin_path.name): run(f'pkill -f {bin_path.name}') process = Popen(start_commands, stdout=PIPE, stderr=PIPE, text=True) lines = [] stream = process.stderr if read_from_stderr else process.stdout for _ in range(lines_to_read): line = stream.readline() # print(f'# {line}') if len(lines) == lines_to_read or not line: break lines.append(line.strip()) ipv4 = next((line for line in lines if is_ipv4(line)), None) urls = [search(url_pattern, url).group() for url in lines if search(url_pattern, url)] if urls: exit_register(terminate_process, bin_path.name, process) if write_link: links_file.write_text(urls[0]) return f'{urls[0]}\n пароль(IP): {ipv4}' if ipv4 else urls[0] else: run(f'pkill -f {bin_path.name}') output = '\n'.join(lines) raise RuntimeError(f'ссылку получить не удалось, вывод работы бинарника:\n{output}') def get_tmole_url(port: int) -> str: return get_revproxy_url( bin_url='https://tunnelmole.com/downloads/tmole-linux.gz', need_unpack=True, bin_path=Path(tmole_bin), start_commands=[str(Path(tmole_bin)), str(port)], read_from_stderr=False, lines_to_read=5 ) def get_tunwg_url(port: int) -> str: return get_revproxy_url( bin_url='https://github.com/ntnj/tunwg/releases/latest/download/tunwg', need_unpack=False, bin_path=Path(tunwg_bin), start_commands=[f'{tunwg_bin}', f'--forward=http://127.0.0.1:{port}'], read_from_stderr=True, lines_to_read=2 ) def get_cloudflared_url(port: int) -> str: return get_revproxy_url( bin_url='https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64', need_unpack=False, bin_path=Path(claudflare_bin), start_commands=[f'{claudflare_bin}', 'tunnel', '--url', f'http://127.0.0.1:{port}'], read_from_stderr=True, lines_to_read=6, url_pattern=r'(?Phttps?://\S+\.trycloudflare\.com)' ) def get_localt_url(port: int) -> str: return get_revproxy_url( bin_url='https://huggingface.co/prolapse/go_localt/resolve/main/go_localt', need_unpack=False, bin_path=Path(go_localt_bin), start_commands=[f'{go_localt_bin}', f'{port}'], read_from_stderr=False, lines_to_read=2 ) def zrok_token_is_valid(zrok_token: str) -> bool: try: run(f'./{zrok_bin.name} disable', cwd=zrok_bin.parent) except: pass master, slave = openpty() process = Popen(f'./{zrok_bin.name} enable {zrok_token}', shell=True, cwd=zrok_bin.parent, stdin=slave, stdout=slave, stderr=slave, text=True) close(slave) output = [] try: while True: try: data = read(master, 1024) except OSError: break if not data: break output.append(data.decode('utf-8')) except: pass close(master) process.wait() full_output = ''.join(output) if 'successfully enabled' in full_output: return True return False def get_zrok_token() -> str: for token in zrok_fallback_tokens: if zrok_token_is_valid(token): zrok_token = token break else: raise RuntimeError('не удалось найти валидный zrok-токен.') return zrok_token def get_zrok_url(port: int) -> str: zrok_bin_url = 'https://huggingface.co/prolapse/zrok/resolve/main/zrok.tar' if not zrok_bin.exists(): unpack_archive(download(zrok_bin_url, progress=False), WORK_FOLDER, rm_archive=True) get_zrok_token() com = f'{zrok_bin} share public http://localhost:{port}/ --headless'.split(' ') return get_revproxy_url( bin_url=zrok_bin_url, need_unpack=True, bin_path=Path(zrok_bin), start_commands=com, read_from_stderr=True, lines_to_read=2, url_pattern=r'(?Phttps?://\S+\.share\.zrok\.io)' ) def get_gradio_url(port: int) -> str | None: max_attempts = 3 for attempt in range(max_attempts): try: response = get_url('https://api.gradio.app/v2/tunnel-request') if response and response.status_code == 200: remote_host, remote_port = response.json()[0]['host'], int(response.json()[0]['port']) com = [f'{gradio_bin}', 'http', '-n', 'random', '-l', f'{port}', '-i', '127.0.0.1', '--uc', '--sd', 'random', '--ue', '--server_addr', f'{remote_host}:{remote_port}', '--disable_log_color'] return get_revproxy_url( bin_url='https://cdn-media.huggingface.co/frpc-gradio-0.1/frpc_linux_amd64', need_unpack=False, bin_path=Path(gradio_bin), start_commands=com, read_from_stderr=False, lines_to_read=3, ) except Exception as e: if attempt < max_attempts - 1: print(f'попытка {attempt + 1} получить ссылку градио провалилась: {e}. пробуем еще...') sleep(5) return None else: raise RuntimeError(f'после трех попыток поднять туннель градио не удалось: {e}') return None proxies_functions = { 'zrok': get_zrok_url, 'tmole': get_tmole_url, 'tunwg': get_tunwg_url, 'cloudflared': get_cloudflared_url, 'localt': get_localt_url, 'gradio': get_gradio_url } def try_all(port: int) -> str: results = [] for proxy_name, proxy_func in proxies_functions.items(): try: url = proxy_func(port) results.append(url) except Exception as e: print(f'{proxy_name}: {e}') return '\n'.join(results) def get_share_link(host: str, port: int) -> str: if host in proxies_functions: return proxies_functions[host](port) elif host == 'all': return try_all(port) else: available_proxies_functions = ', '.join([func.__name__ for func in proxies_functions.values()]) return f'у меня нет функции ассоциированной с {host}, доступные функции:\n{available_proxies_functions}' # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # """ # как использовать: port = 7860 # порт, на котором запущен веб-интерфейс # чтобы использовать определенный прокси-сервер, нужно передать его название как первый аргумент: link = get_share_link('gradio', port) print(link) link = get_share_link('zrok', port) print(link) link = get_share_link('cloudflared', port) print(link) link = get_share_link('tmole', port) print(link) link = get_share_link('tunwg', port) print(link) link = get_share_link('localt', port) print(link) # или все сразу (не рекомендуется): all_possible_links = get_share_link('all', port) print(all_possible_links) """