Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import re | |
| import librosa | |
| import torch | |
| import json | |
| import numpy as np | |
| from transformers import Wav2Vec2ForCTC, AutoProcessor | |
| from huggingface_hub import hf_hub_download | |
| from torchaudio.models.decoder import ctc_decoder | |
| uroman_dir = "uroman" | |
| assert os.path.exists(uroman_dir) | |
| UROMAN_PL = os.path.join(uroman_dir, "bin", "uroman.pl") | |
| ASR_SAMPLING_RATE = 16_000 | |
| WORD_SCORE_DEAULT_IF_LM = -0.18 | |
| WORD_SCORE_DEAULT_IF_NOLM = -3.5 | |
| LM_SCORE_DEAULT = 1.48 | |
| MODEL_ID = "upload/mms_zs" | |
| processor = AutoProcessor.from_pretrained(MODEL_ID) | |
| model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID) | |
| token_file = "upload/mms_zs/tokens.txt" | |
| def error_check_file(filepath): | |
| if not isinstance(filepath, str): | |
| return "Expected file to be of type 'str'. Instead got {}".format( | |
| type(filepath) | |
| ) | |
| if not os.path.exists(filepath): | |
| return "Input file '{}' doesn't exists".format(type(filepath)) | |
| def norm_uroman(text): | |
| text = text.lower() | |
| text = text.replace("’", "'") | |
| text = re.sub("([^a-z' ])", " ", text) | |
| text = re.sub(" +", " ", text) | |
| return text.strip() | |
| def uromanize(words): | |
| iso = "xxx" | |
| with tempfile.NamedTemporaryFile() as tf, tempfile.NamedTemporaryFile() as tf2: | |
| with open(tf.name, "w") as f: | |
| f.write("\n".join(words)) | |
| cmd = f"perl " + UROMAN_PL | |
| cmd += f" -l {iso} " | |
| cmd += f" < {tf.name} > {tf2.name}" | |
| os.system(cmd) | |
| lexicon = {} | |
| with open(tf2.name) as f: | |
| for idx, line in enumerate(f): | |
| if not line.strip(): | |
| continue | |
| line = re.sub(r"\s+", " ", norm_uroman(line)).strip() | |
| lexicon[words[idx]] = " ".join(line) + " |" | |
| return lexicon | |
| def load_lexicon(filepath): | |
| words = {} | |
| with open(filepath) as f: | |
| for line in f: | |
| line = line.strip() | |
| # ignore invalid words. | |
| if not line or " " in line or len(line) > 50: | |
| continue | |
| for w in line.split(): | |
| words[w.lower()] = True | |
| return uromanize(list(words.keys())) | |
| def process( | |
| audio_data, | |
| words_file, | |
| lm_path=None, | |
| wscore=None, | |
| lmscore=None, | |
| wscore_usedefault=True, | |
| lmscore_usedefault=True, | |
| ): | |
| if isinstance(audio_data, tuple): | |
| # microphone | |
| sr, audio_samples = audio_data | |
| audio_samples = (audio_samples / 32768.0).astype(np.float) | |
| assert sr == ASR_SAMPLING_RATE, "Invalid sampling rate" | |
| else: | |
| # file upload | |
| assert isinstance(audio_data, str) | |
| audio_samples = librosa.load(audio_data, sr=ASR_SAMPLING_RATE, mono=True)[0] | |
| # print(audio_samples[:10]) | |
| # print("I'm here 102") | |
| print("len audio_samples", len(audio_samples)) | |
| lang_code = "eng" | |
| # processor.tokenizer.set_target_lang(lang_code) | |
| # print("I'm here 107") | |
| # model.load_adapter(lang_code) | |
| # print("I'm here 109") | |
| inputs = processor( | |
| audio_samples, sampling_rate=ASR_SAMPLING_RATE, return_tensors="pt" | |
| ) | |
| # print("I'm here 106") | |
| print("inputs type", type(inputs)) | |
| # print("inputs size", inputs.size) | |
| # set device | |
| if torch.cuda.is_available(): | |
| device = torch.device("cuda") | |
| elif ( | |
| hasattr(torch.backends, "mps") | |
| and torch.backends.mps.is_available() | |
| and torch.backends.mps.is_built() | |
| ): | |
| device = torch.device("mps") | |
| else: | |
| device = torch.device("cpu") | |
| device = torch.device("cpu") | |
| model.to(device) | |
| inputs = inputs.to(device) | |
| # print("I'm here 122") | |
| with torch.no_grad(): | |
| outputs = model(**inputs).logits | |
| # Setup lexicon and decoder | |
| # print("before uroman") | |
| lexicon = load_lexicon(words_file) | |
| # print("after uroman") | |
| # print("len lexicon", len(lexicon)) | |
| with tempfile.NamedTemporaryFile() as lexicon_file: | |
| with open(lexicon_file.name, "w") as f: | |
| idx = 10 | |
| for word, spelling in lexicon.items(): | |
| f.write(word + " " + spelling + "\n") | |
| if idx % 100 == 0: | |
| print(word, spelling, flush=True) | |
| idx += 1 | |
| if wscore_usedefault: | |
| wscore = ( | |
| WORD_SCORE_DEAULT_IF_LM | |
| if lm_path is not None | |
| else WORD_SCORE_DEAULT_IF_NOLM | |
| ) | |
| if lmscore_usedefault: | |
| lmscore = LM_SCORE_DEAULT if lm_path is not None else 0 | |
| beam_search_decoder = ctc_decoder( | |
| lexicon=lexicon_file.name, | |
| tokens=token_file, | |
| lm=None, | |
| nbest=1, | |
| beam_size=500, | |
| beam_size_token=50, | |
| lm_weight=lmscore, | |
| word_score=wscore, | |
| sil_score=0, | |
| blank_token="<s>", | |
| ) | |
| beam_search_result = beam_search_decoder(outputs.to("cpu")) | |
| transcription = " ".join(beam_search_result[0][0].words).strip() | |
| return transcription | |
| ZS_EXAMPLES = [["upload/english.mp3", "upload/words_top10k.txt"]] | |
| print(process("upload/english.mp3", "upload/words_top10k.txt")) | |