# -------------------------------------------------------- # Pre-Training Transformer Decoder for End-to-End ASR Model with Unpaired Speech Data (https://arxiv.org/abs/2203.17113) # Github source: https://github.com/microsoft/SpeechT5/tree/main/Speech2C # Copyright (c) 2022 Microsoft # Licensed under The MIT License [see LICENSE for details] # Based on fairseq code bases # https://github.com/pytorch/fairseq # -------------------------------------------------------- import math import re from dataclasses import dataclass, field import torch import torch.nn.functional as F from fairseq import metrics, utils from fairseq.criterions import FairseqCriterion, register_criterion from fairseq.criterions.label_smoothed_cross_entropy import label_smoothed_nll_loss from fairseq.criterions.hubert_criterion import HubertCriterionConfig @dataclass class Speech2cCriterionConfig(HubertCriterionConfig): dec_weight: float = field( default=1.0, metadata={"help": "weights for decoder CE Loss, loss will be (hubert_loss + dec_weight * CE_Loss)"}, ) report_accuracy: bool = field( default=True, metadata={"help": "report decoder accuracy metric"}, ) ignore_prefix_size: int = field( default=0, metadata={"help": "Ignore first N tokens"}, ) label_smoothing: float = field( default=0.0, metadata={"help": "epsilon for label smoothing, 0 means no label smoothing"}, ) @register_criterion("speech2c", dataclass=Speech2cCriterionConfig) class Speech2cCriterion(FairseqCriterion): def __init__(self, task, pred_masked_weight, pred_nomask_weight, loss_weights=None, log_keys=None, dec_weight=1.0, report_accuracy=False, ignore_prefix_size=0, label_smoothing=0.0): super().__init__(task) self.pred_masked_weight = pred_masked_weight self.pred_nomask_weight = pred_nomask_weight self.loss_weights = loss_weights self.log_keys = [] if log_keys is None else log_keys self.dec_weight = dec_weight self.report_accuracy = report_accuracy self.ignore_prefix_size = ignore_prefix_size self.eps = label_smoothing self.padding_idx = task.dictionaries[0].pad() def forward(self, model, sample, reduce=True, log_pred=False): """Compute the loss for the given sample. Returns a tuple with three elements: 1) the loss 2) the sample size, which is used as the denominator for the gradient 3) logging outputs to display while training """ net_output = model(target_list=sample["target_list"], **sample["net_input"]) loss = 0.0 sample_size = 0 logging_output = {} reduction = "sum" if reduce else "none" loss_m_list = [] logp_m_list = model.get_logits(net_output, True) targ_m_list = model.get_targets(net_output, True) assert self.pred_masked_weight == 0 or len(logp_m_list) > 0 for i, (logp_m, targ_m) in enumerate(zip(logp_m_list, targ_m_list)): loss_m = F.cross_entropy(logp_m, targ_m, reduction=reduction) loss_m_list.append(loss_m) logging_output[f"loss_m_{i}"] = loss_m.detach().item() if self.pred_masked_weight > 0: loss += self.pred_masked_weight * sum(loss_m_list) sample_size += targ_m_list[0].numel() loss_u_list = [] logp_u_list = model.get_logits(net_output, False) targ_u_list = model.get_targets(net_output, False) assert self.pred_nomask_weight == 0 or len(logp_u_list) > 0 for i, (logp_u, targ_u) in enumerate(zip(logp_u_list, targ_u_list)): loss_u = F.cross_entropy(logp_u, targ_u, reduction=reduction) loss_u_list.append(loss_u) logging_output[f"loss_u_{i}"] = loss_u.detach().item() if self.pred_nomask_weight > 0: loss += self.pred_nomask_weight * sum(loss_u_list) sample_size += targ_u_list[0].numel() if self.loss_weights is not None: assert hasattr(model, "get_extra_losses") extra_losses, names = model.get_extra_losses(net_output) if torch.is_tensor(extra_losses): extra_losses = [extra_losses] names = [names] if len(self.loss_weights) == 1 and len(extra_losses) != 1: self.loss_weights = [self.loss_weights[0]] * len(extra_losses) assert len(extra_losses) == len( self.loss_weights ), f"{len(extra_losses)}, {len(self.loss_weights)}" for p, n, coef in zip(extra_losses, names, self.loss_weights): if coef != 0 and p is not None: p = coef * p.float() * sample_size loss += p logging_output[f"loss_{n}"] = p.item() if "decoder_target" in sample: dec_sample_size = sample["dec_ntokens"] dec_loss, dec_nll_loss = self.compute_ce_loss(model, net_output["decoder_out"], sample, reduce=reduce) loss = loss + (self.dec_weight * dec_loss * sample_size / dec_sample_size) logging_output["dec_loss"] = dec_loss.item() logging_output["dec_nll_loss"] = dec_nll_loss.item() logging_output["dec_sample_size"] = dec_sample_size if self.report_accuracy: n_correct, total = self.compute_accuracy(model, net_output["decoder_out"], sample) logging_output["dec_n_correct"] = utils.item(n_correct.data) logging_output["total"] = utils.item(total.data) logging_output = { "loss": loss.item() if reduce else loss, "ntokens": sample_size, "nsentences": sample["id"].numel(), "sample_size": sample_size, **logging_output, } for lk in self.log_keys: if lk in net_output: logging_output[lk] = float((net_output[lk])) def compute_correct(logits): if logits.numel() == 0: return 0, 0 else: assert logits.dim() > 1, logits.shape max = logits.argmax(-1) == 0 min = logits.argmin(-1) == 0 both = max & min corr = max.long().sum().item() - both.long().sum().item() count = max.numel() return corr, count with torch.no_grad(): for i, logp_m in enumerate(logp_m_list): corr_m, count_m = compute_correct(logp_m) logging_output[f"correct_m_{i}"] = corr_m logging_output[f"count_m_{i}"] = count_m for i, logp_u in enumerate(logp_u_list): corr_u, count_u = compute_correct(logp_u) logging_output[f"correct_u_{i}"] = corr_u logging_output[f"count_u_{i}"] = count_u return loss, sample_size, logging_output def compute_ce_loss(self, model, net_output, sample, reduce=True): lprobs, target = self.get_lprobs_and_target(model, net_output, sample) loss, nll_loss = label_smoothed_nll_loss( lprobs, target, self.eps, ignore_index=self.padding_idx, reduce=reduce, ) return loss, nll_loss def compute_accuracy(self, model, net_output, sample): lprobs, target = self.get_lprobs_and_target(model, net_output, sample) mask = target.ne(self.padding_idx) n_correct = torch.sum( lprobs.argmax(1).masked_select(mask).eq(target.masked_select(mask)) ) total = torch.sum(mask) return n_correct, total def get_lprobs_and_target(self, model, net_output, sample): lprobs = model.get_normalized_probs(net_output, log_probs=True) target = sample["decoder_target"] if self.ignore_prefix_size > 0: if getattr(lprobs, "batch_first", False): lprobs = lprobs[:, self.ignore_prefix_size :, :].contiguous() target = target[:, self.ignore_prefix_size :].contiguous() else: lprobs = lprobs[self.ignore_prefix_size :, :, :].contiguous() target = target[self.ignore_prefix_size :, :].contiguous() return lprobs.view(-1, lprobs.size(-1)), target.view(-1) @staticmethod def reduce_metrics(logging_outputs) -> None: """Aggregate logging outputs from data parallel training (copied from normal cross entropy).""" loss_sum = sum(log.get("loss", 0) for log in logging_outputs) ntokens = sum(log.get("ntokens", 0) for log in logging_outputs) sample_size = sum(log.get("sample_size", 0) for log in logging_outputs) metrics.log_scalar("loss", loss_sum / sample_size / math.log(2), sample_size, round=3) if sample_size != ntokens: metrics.log_scalar("nll_loss", loss_sum / ntokens / math.log(2), ntokens, round=3) metrics.log_derived("ppl", lambda meters: utils.get_perplexity(meters["nll_loss"].avg)) else: metrics.log_derived("ppl", lambda meters: utils.get_perplexity(meters["loss"].avg)) counts = {} for lk in logging_outputs[0].keys(): if lk.startswith("count_"): val = sum(log[lk] for log in logging_outputs) metrics.log_scalar(lk, val) counts[lk] = val for lk in logging_outputs[0].keys(): if lk.startswith("loss_"): val = sum(log[lk] for log in logging_outputs) metrics.log_scalar(lk, val / sample_size / math.log(2), round=3) elif lk.startswith("correct_"): val = sum(log[lk] for log in logging_outputs) metrics.log_scalar(lk, val / counts[re.sub("correct", "count", lk)]) if "dec_loss" in logging_outputs[0]: dec_loss_sum = sum(log.get("dec_loss", 0) for log in logging_outputs) dec_nll_loss_sum = sum(log.get("dec_nll_loss", 0) for log in logging_outputs) dec_sample_size = sum(log.get("dec_sample_size", 0) for log in logging_outputs) metrics.log_scalar( "dec_loss", dec_loss_sum / dec_sample_size / math.log(2), dec_sample_size, round=3 ) metrics.log_scalar( "dec_nll_loss", dec_nll_loss_sum / dec_sample_size / math.log(2), dec_sample_size, round=3 ) metrics.log_derived( "dec_ppl", lambda meters: utils.get_perplexity(meters["dec_nll_loss"].avg) ) total = utils.item(sum(log.get("total", 0) for log in logging_outputs)) if total > 0: metrics.log_scalar("total", total) n_correct = utils.item( sum(log.get("dec_n_correct", 0) for log in logging_outputs) ) metrics.log_scalar("dec_n_correct", n_correct) metrics.log_derived( "dec_accuracy", lambda meters: round( meters["dec_n_correct"].sum * 100.0 / meters["total"].sum, 3 ) if meters["total"].sum > 0 else float("nan"), ) @staticmethod def aggregate_logging_outputs(logging_outputs): """Aggregate logging outputs from data parallel training.""" raise NotImplementedError() @staticmethod def logging_outputs_can_be_summed() -> bool: """ Whether the logging outputs returned by `forward` can be summed across workers prior to calling `reduce_metrics`. Setting this to True will improves distributed training speed. """ return False