import argparse import json import shutil import subprocess import sys import time from typing import Iterator import requests def is_installed(lib_name: str) -> bool: lib = shutil.which(lib_name) if lib is None: return False return True def save(audio: bytes, filename: str) -> None: with open(filename, "wb") as f: f.write(audio) def stream_ffplay(audio_stream, output_file, save=True): if not save: ffplay_cmd = ["ffplay", "-nodisp", "-probesize", "1024", "-autoexit", "-"] else: print("Saving to ", output_file) ffplay_cmd = ["ffmpeg", "-probesize", "1024", "-i", "-", output_file] ffplay_proc = subprocess.Popen(ffplay_cmd, stdin=subprocess.PIPE) for chunk in audio_stream: if chunk is not None: ffplay_proc.stdin.write(chunk) # close on finish ffplay_proc.stdin.close() ffplay_proc.wait() def tts(text, speaker, language, server_url, stream_chunk_size) -> Iterator[bytes]: start = time.perf_counter() speaker["text"] = text speaker["language"] = language speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality res = requests.post( f"{server_url}/tts_stream", json=speaker, stream=True, ) end = time.perf_counter() print(f"Time to make POST: {end-start}s", file=sys.stderr) if res.status_code != 200: print("Error:", res.text) sys.exit(1) first = True for chunk in res.iter_content(chunk_size=512): if first: end = time.perf_counter() print(f"Time to first chunk: {end-start}s", file=sys.stderr) first = False if chunk: yield chunk print("⏱️ response.elapsed:", res.elapsed) def get_speaker(ref_audio,server_url): files = {"wav_file": ("reference.wav", open(ref_audio, "rb"))} response = requests.post(f"{server_url}/clone_speaker", files=files) return response.json() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--text", default="It took me quite a long time to develop a voice and now that I have it I am not going to be silent.", help="text input for TTS" ) parser.add_argument( "--language", default="en", help="Language to use default is 'en' (English)" ) parser.add_argument( "--output_file", default=None, help="Save TTS output to given filename" ) parser.add_argument( "--ref_file", default=None, help="Reference audio file to use, when not given will use default" ) parser.add_argument( "--server_url", default="http://localhost:7860", help="Server url http://localhost:7860 default, change to your server location " ) parser.add_argument( "--stream_chunk_size", default="20", help="Stream chunk size , 20 default, reducing will get faster latency but may degrade quality" ) args = parser.parse_args() with open("./default_speaker.json", "r") as file: speaker = json.load(file) if args.ref_file is not None: print("Computing the latents for a new reference...") speaker = get_speaker(args.ref_file, args.server_url) audio = stream_ffplay( tts( args.text, speaker, args.language, args.server_url, args.stream_chunk_size ), args.output_file, save=bool(args.output_file) )