|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import sys |
|
|
import torch |
|
|
import logging |
|
|
import logging.handlers |
|
|
import transformers |
|
|
|
|
|
from omni_speech.constants import LOGDIR |
|
|
|
|
|
server_error_msg = "**NETWORK ERROR DUE TO HIGH TRAFFIC. PLEASE REGENERATE OR REFRESH THIS PAGE.**" |
|
|
moderation_msg = "YOUR INPUT VIOLATES OUR CONTENT MODERATION GUIDELINES. PLEASE TRY AGAIN." |
|
|
|
|
|
handler = None |
|
|
|
|
|
|
|
|
def build_logger(logger_name, logger_filename): |
|
|
global handler |
|
|
|
|
|
formatter = logging.Formatter( |
|
|
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", |
|
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
|
) |
|
|
|
|
|
|
|
|
if not logging.getLogger().handlers: |
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logging.getLogger().handlers[0].setFormatter(formatter) |
|
|
|
|
|
|
|
|
stdout_logger = logging.getLogger("stdout") |
|
|
stdout_logger.setLevel(logging.INFO) |
|
|
sl = StreamToLogger(stdout_logger, logging.INFO) |
|
|
sys.stdout = sl |
|
|
|
|
|
stderr_logger = logging.getLogger("stderr") |
|
|
stderr_logger.setLevel(logging.ERROR) |
|
|
sl = StreamToLogger(stderr_logger, logging.ERROR) |
|
|
sys.stderr = sl |
|
|
|
|
|
|
|
|
logger = logging.getLogger(logger_name) |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
if handler is None: |
|
|
os.makedirs(LOGDIR, exist_ok=True) |
|
|
filename = os.path.join(LOGDIR, logger_filename) |
|
|
handler = logging.handlers.TimedRotatingFileHandler( |
|
|
filename, when='D', utc=True, encoding='UTF-8') |
|
|
handler.setFormatter(formatter) |
|
|
|
|
|
for name, item in logging.root.manager.loggerDict.items(): |
|
|
if isinstance(item, logging.Logger): |
|
|
item.addHandler(handler) |
|
|
|
|
|
return logger |
|
|
|
|
|
|
|
|
class StreamToLogger(object): |
|
|
""" |
|
|
Fake file-like stream object that redirects writes to a logger instance. |
|
|
""" |
|
|
def __init__(self, logger, log_level=logging.INFO): |
|
|
self.terminal = sys.stdout |
|
|
self.logger = logger |
|
|
self.log_level = log_level |
|
|
self.linebuf = '' |
|
|
|
|
|
def __getattr__(self, attr): |
|
|
return getattr(self.terminal, attr) |
|
|
|
|
|
def write(self, buf): |
|
|
temp_linebuf = self.linebuf + buf |
|
|
self.linebuf = '' |
|
|
for line in temp_linebuf.splitlines(True): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if line[-1] == '\n': |
|
|
self.logger.log(self.log_level, line.rstrip()) |
|
|
else: |
|
|
self.linebuf += line |
|
|
|
|
|
def flush(self): |
|
|
if self.linebuf != '': |
|
|
self.logger.log(self.log_level, self.linebuf.rstrip()) |
|
|
self.linebuf = '' |
|
|
|
|
|
|
|
|
def maybe_zero_3(param, ignore_status=False, name=None): |
|
|
from deepspeed import zero |
|
|
from deepspeed.runtime.zero.partition_parameters import ZeroParamStatus |
|
|
if hasattr(param, "ds_id"): |
|
|
if param.ds_status == ZeroParamStatus.NOT_AVAILABLE: |
|
|
if not ignore_status: |
|
|
logging.warning(f"{name}: param.ds_status != ZeroParamStatus.NOT_AVAILABLE: {param.ds_status}") |
|
|
with zero.GatheredParameters([param]): |
|
|
param = param.data.detach().cpu().clone() |
|
|
else: |
|
|
param = param.detach().cpu().clone() |
|
|
return param |
|
|
|
|
|
|
|
|
|
|
|
def get_peft_state_maybe_zero_3(named_params, bias): |
|
|
if bias == "none": |
|
|
to_return = {k: t for k, t in named_params if "lora_" in k} |
|
|
elif bias == "all": |
|
|
to_return = {k: t for k, t in named_params if "lora_" in k or "bias" in k} |
|
|
elif bias == "lora_only": |
|
|
to_return = {} |
|
|
maybe_lora_bias = {} |
|
|
lora_bias_names = set() |
|
|
for k, t in named_params: |
|
|
if "lora_" in k: |
|
|
to_return[k] = t |
|
|
bias_name = k.split("lora_")[0] + "bias" |
|
|
lora_bias_names.add(bias_name) |
|
|
elif "bias" in k: |
|
|
maybe_lora_bias[k] = t |
|
|
for k, t in maybe_lora_bias: |
|
|
if bias_name in lora_bias_names: |
|
|
to_return[bias_name] = t |
|
|
else: |
|
|
raise NotImplementedError |
|
|
to_return = {k: maybe_zero_3(v, ignore_status=True) for k, v in to_return.items()} |
|
|
return to_return |
|
|
|
|
|
|
|
|
def get_peft_state_non_lora_maybe_zero_3(named_params, require_grad_only=True): |
|
|
to_return = {k: t for k, t in named_params if "lora_" not in k} |
|
|
if require_grad_only: |
|
|
to_return = {k: t for k, t in to_return.items() if t.requires_grad} |
|
|
to_return = {k: maybe_zero_3(v, ignore_status=True).cpu() for k, v in to_return.items()} |
|
|
return to_return |
|
|
|
|
|
|
|
|
def get_speech_projector_state_maybe_zero_3(named_params, keys_to_match): |
|
|
to_return = {k: t for k, t in named_params if any(key_match in k for key_match in keys_to_match)} |
|
|
to_return = {k: maybe_zero_3(v, ignore_status=True).cpu() for k, v in to_return.items()} |
|
|
return to_return |
|
|
|
|
|
|
|
|
def find_all_linear_names(model): |
|
|
cls = torch.nn.Linear |
|
|
lora_module_names = set() |
|
|
speech_keywords = ['speech_projector', 'speech_encoder', 'speech_generator'] |
|
|
for name, module in model.named_modules(): |
|
|
if any(speech_keyword in name for speech_keyword in speech_keywords): |
|
|
continue |
|
|
if isinstance(module, cls): |
|
|
names = name.split('.') |
|
|
lora_module_names.add(names[0] if len(names) == 1 else names[-1]) |
|
|
|
|
|
if 'lm_head' in lora_module_names: |
|
|
lora_module_names.remove('lm_head') |
|
|
return list(lora_module_names) |
|
|
|
|
|
|
|
|
def safe_save_model_for_hf_trainer(trainer: transformers.Trainer, |
|
|
output_dir: str): |
|
|
"""Collects the state dict and dump to disk.""" |
|
|
|
|
|
if getattr(trainer.args, "tune_speech_projector", False): |
|
|
|
|
|
keys_to_match = ['speech_projector'] |
|
|
if getattr(trainer.args, "use_im_start_end", False): |
|
|
keys_to_match.extend(['embed_tokens', 'embed_in']) |
|
|
|
|
|
weight_to_save = get_speech_projector_state_maybe_zero_3(trainer.model.named_parameters(), keys_to_match) |
|
|
trainer.model.config.save_pretrained(output_dir) |
|
|
|
|
|
current_folder = output_dir.split('/')[-1] |
|
|
parent_folder = os.path.dirname(output_dir) |
|
|
if trainer.args.local_rank == 0 or trainer.args.local_rank == -1: |
|
|
if current_folder.startswith('checkpoint-'): |
|
|
speech_projector_folder = os.path.join(parent_folder, "speech_projector") |
|
|
os.makedirs(speech_projector_folder, exist_ok=True) |
|
|
torch.save(weight_to_save, os.path.join(speech_projector_folder, f'{current_folder}.bin')) |
|
|
else: |
|
|
torch.save(weight_to_save, os.path.join(output_dir, f'speech_projector.bin')) |
|
|
return |
|
|
|
|
|
if trainer.deepspeed: |
|
|
torch.cuda.synchronize() |
|
|
trainer.save_model(output_dir) |
|
|
return |
|
|
|
|
|
state_dict = trainer.model.state_dict() |
|
|
if trainer.args.should_save: |
|
|
cpu_state_dict = { |
|
|
key: value.cpu() |
|
|
for key, value in state_dict.items() |
|
|
} |
|
|
del state_dict |
|
|
trainer._save(output_dir, state_dict=cpu_state_dict) |
|
|
|
|
|
|
|
|
def lengths_to_padding_mask(lens): |
|
|
bsz, max_lens = lens.size(0), torch.max(lens).item() |
|
|
mask = torch.arange(max_lens).to(lens.device).view(1, max_lens) |
|
|
mask = mask.expand(bsz, -1) >= lens.view(bsz, 1).expand(-1, max_lens) |
|
|
return mask |
|
|
|
|
|
|
|
|
def lengths_to_mask(lens): |
|
|
return ~lengths_to_padding_mask(lens) |
|
|
|
|
|
|
|
|
def disable_torch_init(): |
|
|
""" |
|
|
Disable the redundant torch default initialization to accelerate model creation. |
|
|
""" |
|
|
import torch |
|
|
setattr(torch.nn.Linear, "reset_parameters", lambda self: None) |
|
|
setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None) |
|
|
|
|
|
|
|
|
def get_model_name_from_path(model_path): |
|
|
model_path = model_path.strip("/") |
|
|
model_paths = model_path.split("/") |
|
|
if model_paths[-1].startswith('checkpoint-'): |
|
|
return model_paths[-2] + "_" + model_paths[-1] |
|
|
else: |
|
|
return model_paths[-1] |
|
|
|
|
|
|
|
|
def violates_moderation(text): |
|
|
""" |
|
|
Check whether the text violates OpenAI moderation API. |
|
|
""" |
|
|
url = "https://api.openai.com/v1/moderations" |
|
|
headers = {"Content-Type": "application/json", |
|
|
"Authorization": "Bearer " + os.environ["OPENAI_API_KEY"]} |
|
|
text = text.replace("\n", "") |
|
|
data = "{" + '"input": ' + f'"{text}"' + "}" |
|
|
data = data.encode("utf-8") |
|
|
try: |
|
|
ret = requests.post(url, headers=headers, data=data, timeout=5) |
|
|
flagged = ret.json()["results"][0]["flagged"] |
|
|
except requests.exceptions.RequestException as e: |
|
|
flagged = False |
|
|
except KeyError as e: |
|
|
flagged = False |
|
|
|
|
|
return flagged |
|
|
|
|
|
|
|
|
def pretty_print_semaphore(semaphore): |
|
|
if semaphore is None: |
|
|
return "None" |
|
|
return f"Semaphore(value={semaphore._value}, locked={semaphore.locked()})" |