Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # | |
| # This source code is licensed under the Apache License, Version 2.0 | |
| # found in the LICENSE file in the root directory of this source tree. | |
| import torch | |
| from torch import nn | |
| import torchvision | |
| from models.backbone import resize | |
| def kaiming_init(module: nn.Module, | |
| a: float = 0, | |
| mode: str = 'fan_out', | |
| nonlinearity: str = 'relu', | |
| bias: float = 0, | |
| distribution: str = 'normal') -> None: | |
| assert distribution in ['uniform', 'normal'] | |
| if hasattr(module, 'weight') and module.weight is not None: | |
| if distribution == 'uniform': | |
| nn.init.kaiming_uniform_( | |
| module.weight, a=a, mode=mode, nonlinearity=nonlinearity) | |
| else: | |
| nn.init.kaiming_normal_( | |
| module.weight, a=a, mode=mode, nonlinearity=nonlinearity) | |
| if hasattr(module, 'bias') and module.bias is not None: | |
| nn.init.constant_(module.bias, bias) | |
| class ConvModule(nn.Module): | |
| """A conv block that bundles conv/norm/activation layers. | |
| This block simplifies the usage of convolution layers, which are commonly | |
| used with a norm layer (e.g., BatchNorm) and activation layer (e.g., ReLU). | |
| It is based upon three build methods: `build_conv_layer()`, | |
| `build_norm_layer()` and `build_activation_layer()`. | |
| Besides, we add some additional features in this module. | |
| 1. Automatically set `bias` of the conv layer. | |
| 2. Spectral norm is supported. | |
| 3. More padding modes are supported. Before PyTorch 1.5, nn.Conv2d only | |
| supports zero and circular padding, and we add "reflect" padding mode. | |
| Args: | |
| in_channels (int): Number of channels in the input feature map. | |
| Same as that in ``nn._ConvNd``. | |
| out_channels (int): Number of channels produced by the convolution. | |
| Same as that in ``nn._ConvNd``. | |
| kernel_size (int | tuple[int]): Size of the convolving kernel. | |
| Same as that in ``nn._ConvNd``. | |
| stride (int | tuple[int]): Stride of the convolution. | |
| Same as that in ``nn._ConvNd``. | |
| padding (int | tuple[int]): Zero-padding added to both sides of | |
| the input. Same as that in ``nn._ConvNd``. | |
| dilation (int | tuple[int]): Spacing between kernel elements. | |
| Same as that in ``nn._ConvNd``. | |
| groups (int): Number of blocked connections from input channels to | |
| output channels. Same as that in ``nn._ConvNd``. | |
| bias (bool | str): If specified as `auto`, it will be decided by the | |
| norm_cfg. Bias will be set as True if `norm_cfg` is None, otherwise | |
| False. Default: "auto". | |
| conv_cfg (dict): Config dict for convolution layer. Default: None, | |
| which means using conv2d. | |
| norm_cfg (dict): Config dict for normalization layer. Default: None. | |
| act_cfg (dict): Config dict for activation layer. | |
| Default: dict(type='ReLU'). | |
| inplace (bool): Whether to use inplace mode for activation. | |
| Default: True. | |
| with_spectral_norm (bool): Whether use spectral norm in conv module. | |
| Default: False. | |
| padding_mode (str): If the `padding_mode` has not been supported by | |
| current `Conv2d` in PyTorch, we will use our own padding layer | |
| instead. Currently, we support ['zeros', 'circular'] with official | |
| implementation and ['reflect'] with our own implementation. | |
| Default: 'zeros'. | |
| order (tuple[str]): The order of conv/norm/activation layers. It is a | |
| sequence of "conv", "norm" and "act". Common examples are | |
| ("conv", "norm", "act") and ("act", "conv", "norm"). | |
| Default: ('conv', 'norm', 'act'). | |
| """ | |
| _abbr_ = 'conv_block' | |
| def __init__(self, | |
| in_channels, | |
| out_channels, | |
| kernel_size, | |
| stride = 1, | |
| padding = 0, | |
| dilation = 1, | |
| groups = 1, | |
| bias = 'auto', | |
| conv_cfg = None, | |
| norm_cfg = None, | |
| act_cfg = dict(type='ReLU'), | |
| inplace= True, | |
| with_spectral_norm = False, | |
| padding_mode = 'zeros', | |
| order = ('conv', 'norm', 'act')): | |
| super().__init__() | |
| assert conv_cfg is None or isinstance(conv_cfg, dict) | |
| assert norm_cfg is None or isinstance(norm_cfg, dict) | |
| assert act_cfg is None or isinstance(act_cfg, dict) | |
| official_padding_mode = ['zeros', 'circular'] | |
| self.conv_cfg = conv_cfg | |
| self.norm_cfg = norm_cfg | |
| self.act_cfg = act_cfg | |
| self.inplace = inplace | |
| self.with_spectral_norm = with_spectral_norm | |
| self.with_explicit_padding = padding_mode not in official_padding_mode | |
| self.order = order | |
| assert isinstance(self.order, tuple) and len(self.order) == 3 | |
| assert set(order) == {'conv', 'norm', 'act'} | |
| self.with_norm = norm_cfg is not None | |
| self.with_activation = act_cfg is not None | |
| # if the conv layer is before a norm layer, bias is unnecessary. | |
| if bias == 'auto': | |
| bias = not self.with_norm | |
| self.with_bias = bias | |
| if self.with_explicit_padding: | |
| pad_cfg = dict(type=padding_mode) | |
| self.padding_layer = build_padding_layer(pad_cfg, padding) | |
| # to do Camille put back | |
| # reset padding to 0 for conv module | |
| conv_padding = 0 if self.with_explicit_padding else padding | |
| # build convolution layer | |
| self.conv = nn.Conv2d( #build_conv_layer(#conv_cfg, | |
| in_channels, | |
| out_channels, | |
| kernel_size, | |
| stride=stride, | |
| padding=conv_padding, | |
| dilation=dilation, | |
| groups=groups, | |
| bias=bias) | |
| # export the attributes of self.conv to a higher level for convenience | |
| self.in_channels = self.conv.in_channels | |
| self.out_channels = self.conv.out_channels | |
| self.kernel_size = self.conv.kernel_size | |
| self.stride = self.conv.stride | |
| self.padding = padding | |
| self.dilation = self.conv.dilation | |
| self.transposed = self.conv.transposed | |
| self.output_padding = self.conv.output_padding | |
| self.groups = self.conv.groups | |
| if self.with_spectral_norm: | |
| self.conv = nn.utils.spectral_norm(self.conv) | |
| self.norm_name = None # type: ignore | |
| # build activation layer | |
| if self.with_activation: | |
| act_cfg_ = act_cfg.copy() # type: ignore | |
| # nn.Tanh has no 'inplace' argument | |
| if act_cfg_['type'] not in [ | |
| 'Tanh', 'PReLU', 'Sigmoid', 'HSigmoid', 'Swish', 'GELU' | |
| ]: | |
| act_cfg_.setdefault('inplace', inplace) | |
| self.activate = nn.ReLU() # build_activation_layer(act_cfg_) | |
| # Use msra init by default | |
| torch.manual_seed(1) | |
| self.init_weights() | |
| def norm(self): | |
| if self.norm_name: | |
| return getattr(self, self.norm_name) | |
| else: | |
| return None | |
| def init_weights(self): | |
| # 1. It is mainly for customized conv layers with their own | |
| # initialization manners by calling their own ``init_weights()``, | |
| # and we do not want ConvModule to override the initialization. | |
| # 2. For customized conv layers without their own initialization | |
| # manners (that is, they don't have their own ``init_weights()``) | |
| # and PyTorch's conv layers, they will be initialized by | |
| # this method with default ``kaiming_init``. | |
| # Note: For PyTorch's conv layers, they will be overwritten by our | |
| # initialization implementation using default ``kaiming_init``. | |
| if not hasattr(self.conv, 'init_weights'): | |
| if self.with_activation and self.act_cfg['type'] == 'LeakyReLU': | |
| nonlinearity = 'leaky_relu' | |
| a = self.act_cfg.get('negative_slope', 0.01) | |
| else: | |
| nonlinearity = 'relu' | |
| a = 0 | |
| kaiming_init(self.conv, a=a, nonlinearity=nonlinearity) | |
| if self.with_norm: | |
| constant_init(self.norm, 1, bias=0) | |
| def forward(self, | |
| x: torch.Tensor, | |
| activate: bool = True, | |
| norm: bool = True, | |
| debug: bool = False) -> torch.Tensor: | |
| for layer in self.order: | |
| if debug==True: | |
| breakpoint() | |
| if layer == 'conv': | |
| if self.with_explicit_padding: | |
| x = self.padding_layer(x) | |
| x = self.conv(x) | |
| elif layer == 'norm' and norm and self.with_norm: | |
| x = self.norm(x) | |
| elif layer == 'act' and activate and self.with_activation: | |
| x = self.activate(x) | |
| return x | |
| class Interpolate(nn.Module): | |
| def __init__(self, scale_factor, mode, align_corners=False): | |
| super(Interpolate, self).__init__() | |
| self.interp = nn.functional.interpolate | |
| self.scale_factor = scale_factor | |
| self.mode = mode | |
| self.align_corners = align_corners | |
| def forward(self, x): | |
| x = self.interp( | |
| x, | |
| scale_factor=self.scale_factor, | |
| mode=self.mode, | |
| align_corners=self.align_corners) | |
| return x | |
| class HeadDepth(nn.Module): | |
| def __init__(self, features, classify=False, n_bins=256): | |
| super(HeadDepth, self).__init__() | |
| self.head = nn.Sequential( | |
| nn.Conv2d(features, features // 2, kernel_size=3, stride=1, padding=1), | |
| Interpolate(scale_factor=2, mode="bilinear", align_corners=True), | |
| nn.Conv2d(features // 2, 32, kernel_size=3, stride=1, padding=1), | |
| nn.ReLU(), | |
| nn.Conv2d(32, 1 if not classify else n_bins, kernel_size=1, stride=1, padding=0), | |
| ) | |
| def forward(self, x): | |
| x = self.head(x) | |
| return x | |
| class ReassembleBlocks(nn.Module): | |
| """ViTPostProcessBlock, process cls_token in ViT backbone output and | |
| rearrange the feature vector to feature map. | |
| Args: | |
| in_channels (int): ViT feature channels. Default: 768. | |
| out_channels (List): output channels of each stage. | |
| Default: [96, 192, 384, 768]. | |
| readout_type (str): Type of readout operation. Default: 'ignore'. | |
| patch_size (int): The patch size. Default: 16. | |
| init_cfg (dict, optional): Initialization config dict. Default: None. | |
| """ | |
| def __init__(self, | |
| in_channels=1024, #768, | |
| out_channels=[128, 256, 512, 1024], #[96, 192, 384, 768], | |
| readout_type='project', # 'ignore', | |
| patch_size=16): | |
| super(ReassembleBlocks, self).__init__()#init_cfg) | |
| assert readout_type in ['ignore', 'add', 'project'] | |
| self.readout_type = readout_type | |
| self.patch_size = patch_size | |
| self.projects = nn.ModuleList([ | |
| ConvModule( | |
| in_channels=in_channels, | |
| out_channels=out_channel, | |
| kernel_size=1, | |
| act_cfg=None, | |
| ) for out_channel in out_channels | |
| ]) | |
| self.resize_layers = nn.ModuleList([ | |
| nn.ConvTranspose2d( | |
| in_channels=out_channels[0], | |
| out_channels=out_channels[0], | |
| kernel_size=4, | |
| stride=4, | |
| padding=0), | |
| nn.ConvTranspose2d( | |
| in_channels=out_channels[1], | |
| out_channels=out_channels[1], | |
| kernel_size=2, | |
| stride=2, | |
| padding=0), | |
| nn.Identity(), | |
| nn.Conv2d( | |
| in_channels=out_channels[3], | |
| out_channels=out_channels[3], | |
| kernel_size=3, | |
| stride=2, | |
| padding=1) | |
| ]) | |
| if self.readout_type == 'project': | |
| self.readout_projects = nn.ModuleList() | |
| for _ in range(len(self.projects)): | |
| self.readout_projects.append( | |
| nn.Sequential( | |
| nn.Linear(2 * in_channels, in_channels), | |
| nn.GELU())) | |
| #build_activation_layer(dict(type='GELU')))) | |
| def forward(self, inputs): | |
| assert isinstance(inputs, list) | |
| out = [] | |
| for i, x in enumerate(inputs): | |
| assert len(x) == 2 | |
| x, cls_token = x[0], x[1] | |
| feature_shape = x.shape | |
| if self.readout_type == 'project': | |
| x = x.flatten(2).permute((0, 2, 1)) | |
| readout = cls_token.unsqueeze(1).expand_as(x) | |
| x = self.readout_projects[i](torch.cat((x, readout), -1)) | |
| x = x.permute(0, 2, 1).reshape(feature_shape) | |
| elif self.readout_type == 'add': | |
| x = x.flatten(2) + cls_token.unsqueeze(-1) | |
| x = x.reshape(feature_shape) | |
| else: | |
| pass | |
| x = self.projects[i](x) | |
| x = self.resize_layers[i](x) | |
| out.append(x) | |
| return out | |
| class PreActResidualConvUnit(nn.Module): | |
| """ResidualConvUnit, pre-activate residual unit. | |
| Args: | |
| in_channels (int): number of channels in the input feature map. | |
| act_cfg (dict): dictionary to construct and config activation layer. | |
| norm_cfg (dict): dictionary to construct and config norm layer. | |
| stride (int): stride of the first block. Default: 1 | |
| dilation (int): dilation rate for convs layers. Default: 1. | |
| init_cfg (dict, optional): Initialization config dict. Default: None. | |
| """ | |
| def __init__(self, | |
| in_channels, | |
| act_cfg, | |
| norm_cfg, | |
| stride=1, | |
| dilation=1, | |
| init_cfg=None): | |
| super(PreActResidualConvUnit, self).__init__()#init_cfg) | |
| self.conv1 = ConvModule( | |
| in_channels, | |
| in_channels, | |
| 3, | |
| stride=stride, | |
| padding=dilation, | |
| dilation=dilation, | |
| norm_cfg=norm_cfg, | |
| act_cfg=act_cfg, | |
| bias=False, | |
| order=('act', 'conv', 'norm')) | |
| self.conv2 = ConvModule( | |
| in_channels, | |
| in_channels, | |
| 3, | |
| padding=1, | |
| norm_cfg=norm_cfg, | |
| act_cfg=act_cfg, | |
| bias=False, | |
| order=('act', 'conv', 'norm')) | |
| def forward(self, inputs): | |
| inputs_ = inputs.clone() | |
| x = self.conv1(inputs) | |
| x = self.conv2(x) | |
| return x + inputs_ | |
| class FeatureFusionBlock(nn.Module): | |
| """FeatureFusionBlock, merge feature map from different stages. | |
| Args: | |
| in_channels (int): Input channels. | |
| act_cfg (dict): The activation config for ResidualConvUnit. | |
| norm_cfg (dict): Config dict for normalization layer. | |
| expand (bool): Whether expand the channels in post process block. | |
| Default: False. | |
| align_corners (bool): align_corner setting for bilinear upsample. | |
| Default: True. | |
| init_cfg (dict, optional): Initialization config dict. Default: None. | |
| """ | |
| def __init__(self, | |
| in_channels, | |
| act_cfg, | |
| norm_cfg, | |
| expand=False, | |
| align_corners=True, | |
| init_cfg=None): | |
| super(FeatureFusionBlock, self).__init__()#init_cfg) | |
| self.in_channels = in_channels | |
| self.expand = expand | |
| self.align_corners = align_corners | |
| self.out_channels = in_channels | |
| if self.expand: | |
| self.out_channels = in_channels // 2 | |
| self.project = ConvModule( | |
| self.in_channels, | |
| self.out_channels, | |
| kernel_size=1, | |
| act_cfg=None, | |
| bias=True) | |
| self.res_conv_unit1 = PreActResidualConvUnit( | |
| in_channels=self.in_channels, act_cfg=act_cfg, norm_cfg=norm_cfg) | |
| self.res_conv_unit2 = PreActResidualConvUnit( | |
| in_channels=self.in_channels, act_cfg=act_cfg, norm_cfg=norm_cfg) | |
| def forward(self, *inputs): | |
| x = inputs[0] | |
| if len(inputs) == 2: | |
| if x.shape != inputs[1].shape: | |
| res = resize( | |
| inputs[1], | |
| size=(x.shape[2], x.shape[3]), | |
| mode='bilinear', | |
| align_corners=False) | |
| else: | |
| res = inputs[1] | |
| x = x + self.res_conv_unit1(res) | |
| x = self.res_conv_unit2(x) | |
| x = resize( x, scale_factor=2, mode='bilinear', align_corners=self.align_corners) | |
| x = self.project(x) | |
| return x | |
| class DPTHead(nn.Module): | |
| """Vision Transformers for Dense Prediction. | |
| This head is implemented of `DPT <https://arxiv.org/abs/2103.13413>`_. | |
| Args: | |
| embed_dims (int): The embed dimension of the ViT backbone. | |
| Default: 768. | |
| post_process_channels (List): Out channels of post process conv | |
| layers. Default: [96, 192, 384, 768]. | |
| readout_type (str): Type of readout operation. Default: 'ignore'. | |
| patch_size (int): The patch size. Default: 16. | |
| expand_channels (bool): Whether expand the channels in post process | |
| block. Default: False. | |
| """ | |
| def __init__(self, | |
| in_channels=(1024, 1024, 1024, 1024), | |
| channels=256, | |
| embed_dims=1024, | |
| post_process_channels=[128, 256, 512, 1024], | |
| readout_type='project', | |
| patch_size=16, | |
| expand_channels=False, | |
| min_depth = 0.001, | |
| classify=False, | |
| n_bins=256, | |
| **kwargs): | |
| super(DPTHead, self).__init__(**kwargs) | |
| torch.manual_seed(1) | |
| self.channels = channels | |
| self.norm_cfg = None | |
| self.min_depth = min_depth | |
| self.max_depth = 10 | |
| self.n_bins = n_bins | |
| self.classify = classify | |
| self.in_channels = in_channels | |
| self.expand_channels = expand_channels | |
| self.reassemble_blocks = ReassembleBlocks(in_channels=embed_dims, # Camille 23-06-26 | |
| out_channels=post_process_channels) # Camille 23-06-26 | |
| self.post_process_channels = [ | |
| channel * math.pow(2, i) if expand_channels else channel | |
| for i, channel in enumerate(post_process_channels) | |
| ] | |
| self.convs = nn.ModuleList() | |
| for channel in self.post_process_channels: | |
| self.convs.append( | |
| ConvModule( | |
| channel, | |
| self.channels, | |
| kernel_size=3, | |
| padding=1, | |
| act_cfg=None, | |
| bias=False)) | |
| self.fusion_blocks = nn.ModuleList() | |
| self.act_cfg = {'type': 'ReLU'} | |
| for _ in range(len(self.convs)): | |
| self.fusion_blocks.append( | |
| FeatureFusionBlock(self.channels, self.act_cfg, self.norm_cfg)) | |
| self.fusion_blocks[0].res_conv_unit1 = None | |
| torch.manual_seed(1) | |
| self.project = ConvModule( | |
| self.channels, | |
| self.channels, | |
| kernel_size=3, | |
| padding=1, | |
| norm_cfg=None) | |
| self.num_fusion_blocks = len(self.fusion_blocks) | |
| self.num_reassemble_blocks = len(self.reassemble_blocks.resize_layers) | |
| self.num_post_process_channels = len(self.post_process_channels) | |
| assert self.num_fusion_blocks == self.num_reassemble_blocks | |
| assert self.num_reassemble_blocks == self.num_post_process_channels | |
| #self.conv_depth = HeadDepth(self.channels) | |
| self.conv_depth = HeadDepth(self.channels, self.classify, self.n_bins) | |
| self.relu = nn.ReLU() | |
| self.sigmoid = nn.Sigmoid() | |
| def forward(self, inputs): | |
| assert len(inputs) == self.num_reassemble_blocks | |
| x = [inp for inp in inputs] | |
| x = self.reassemble_blocks(x) | |
| x = [self.convs[i](feature) for i, feature in enumerate(x)] | |
| out = self.fusion_blocks[0](x[-1]) | |
| for i in range(1, len(self.fusion_blocks)): | |
| out = self.fusion_blocks[i](out, x[-(i + 1)]) | |
| out = self.project(out) | |
| if self.classify: | |
| logit = self.conv_depth(out) | |
| #if self.bins_strategy == 'UD': | |
| bins = torch.linspace(self.min_depth, self.max_depth, self.n_bins, device=inputs[0][0].device) | |
| #linear strategy | |
| logit = torch.relu(logit) | |
| eps = 0.1 | |
| logit = logit + eps | |
| logit = logit / logit.sum(dim=1, keepdim=True) | |
| out = torch.einsum('ikmn,k->imn', [logit, bins]).unsqueeze(dim=1) #+ self.min_depth | |
| else: | |
| out = self.relu(self.conv_depth(out)) + self.min_depth | |
| return out | |