|
"Implements [mixup](https://arxiv.org/abs/1710.09412) training method" |
|
from ..torch_core import * |
|
from ..callback import * |
|
from ..basic_train import Learner, LearnerCallback |
|
|
|
class MixUpCallback(LearnerCallback): |
|
"Callback that creates the mixed-up input and target." |
|
def __init__(self, learn:Learner, alpha:float=0.4, stack_x:bool=False, stack_y:bool=True): |
|
super().__init__(learn) |
|
self.alpha,self.stack_x,self.stack_y = alpha,stack_x,stack_y |
|
|
|
def on_train_begin(self, **kwargs): |
|
if self.stack_y: self.learn.loss_func = MixUpLoss(self.learn.loss_func) |
|
|
|
def on_batch_begin(self, last_input, last_target, train, **kwargs): |
|
"Applies mixup to `last_input` and `last_target` if `train`." |
|
if not train: return |
|
lambd = np.random.beta(self.alpha, self.alpha, last_target.size(0)) |
|
lambd = np.concatenate([lambd[:,None], 1-lambd[:,None]], 1).max(1) |
|
lambd = last_input.new(lambd) |
|
shuffle = torch.randperm(last_target.size(0)).to(last_input.device) |
|
x1, y1 = last_input[shuffle], last_target[shuffle] |
|
if self.stack_x: |
|
new_input = [last_input, last_input[shuffle], lambd] |
|
else: |
|
out_shape = [lambd.size(0)] + [1 for _ in range(len(x1.shape) - 1)] |
|
new_input = (last_input * lambd.view(out_shape) + x1 * (1-lambd).view(out_shape)) |
|
if self.stack_y: |
|
new_target = torch.cat([last_target[:,None].float(), y1[:,None].float(), lambd[:,None].float()], 1) |
|
else: |
|
if len(last_target.shape) == 2: |
|
lambd = lambd.unsqueeze(1).float() |
|
new_target = last_target.float() * lambd + y1.float() * (1-lambd) |
|
return {'last_input': new_input, 'last_target': new_target} |
|
|
|
def on_train_end(self, **kwargs): |
|
if self.stack_y: self.learn.loss_func = self.learn.loss_func.get_old() |
|
|
|
|
|
class MixUpLoss(Module): |
|
"Adapt the loss function `crit` to go with mixup." |
|
|
|
def __init__(self, crit, reduction='mean'): |
|
super().__init__() |
|
if hasattr(crit, 'reduction'): |
|
self.crit = crit |
|
self.old_red = crit.reduction |
|
setattr(self.crit, 'reduction', 'none') |
|
else: |
|
self.crit = partial(crit, reduction='none') |
|
self.old_crit = crit |
|
self.reduction = reduction |
|
|
|
def forward(self, output, target): |
|
if len(target.size()) == 2: |
|
loss1, loss2 = self.crit(output,target[:,0].long()), self.crit(output,target[:,1].long()) |
|
d = (loss1 * target[:,2] + loss2 * (1-target[:,2])).mean() |
|
else: d = self.crit(output, target) |
|
if self.reduction == 'mean': return d.mean() |
|
elif self.reduction == 'sum': return d.sum() |
|
return d |
|
|
|
def get_old(self): |
|
if hasattr(self, 'old_crit'): return self.old_crit |
|
elif hasattr(self, 'old_red'): |
|
setattr(self.crit, 'reduction', self.old_red) |
|
return self.crit |
|
|