import torch import math from torch import nn import torch.nn.functional as F from transformers import PreTrainedModel, GenerationMixin from transformers.cache_utils import DynamicCache, Cache from transformers.modeling_attn_mask_utils import AttentionMaskConverter from transformers.cache_utils import Cache, DynamicCache, StaticCache from transformers.utils import logging from transformers.modeling_outputs import ( BaseModelOutputWithPast, CausalLMOutputWithPast, ) from transformers.activations import ACT2FN from typing import Optional, Tuple, Union, List from .configuration_neuroblast import NeuroBLASTConfig CLAMP_VALUE = 1e5 logger = logging.get_logger(__name__) def apply_gradient_scaling( tensor: torch.Tensor, scale: float, enabled: bool = True ) -> torch.Tensor: """ Apply gradient scaling to a tensor. This scales the gradients during backward pass while keeping forward pass unchanged. """ if not enabled or scale == 1.0 or not tensor.requires_grad: return tensor # Use a custom autograd function for gradient scaling class GradientScale(torch.autograd.Function): @staticmethod def forward(ctx, input_tensor, scale_factor): ctx.scale = scale_factor return input_tensor.clone() @staticmethod def backward(ctx, grad_output): if grad_output is None: return None, None return grad_output * ctx.scale, None return GradientScale.apply(tensor, scale) def _prepare_4d_causal_attention_mask_with_cache_position( attention_mask: torch.Tensor, sequence_length: int, target_length: int, dtype: torch.dtype, device: torch.device, min_dtype: float, cache_position: torch.Tensor, batch_size: int, ): if attention_mask is not None and attention_mask.dim() == 4: causal_mask = attention_mask else: causal_mask = torch.full( (sequence_length, target_length), fill_value=min_dtype, dtype=dtype, device=device, ) if sequence_length != 1: causal_mask = torch.triu(causal_mask, diagonal=1) causal_mask *= torch.arange( target_length, device=device ) > cache_position.reshape(-1, 1) causal_mask = causal_mask[None, None, :, :].expand(batch_size, 1, -1, -1) if attention_mask is not None: causal_mask = ( causal_mask.clone() ) mask_length = attention_mask.shape[-1] padding_mask = ( causal_mask[:, :, :, :mask_length] + attention_mask[:, None, None, :] ) padding_mask = padding_mask == 0 causal_mask[:, :, :, :mask_length] = causal_mask[ :, :, :, :mask_length ].masked_fill(padding_mask, min_dtype) return causal_mask # --- RoPE Implementation (using HF LlamaRotaryEmbedding) --- class LlamaRMSNorm(nn.Module): def __init__(self, hidden_size, eps=1e-6): """ LlamaRMSNorm is equivalent to T5LayerNorm """ super().__init__() self.weight = nn.Parameter(torch.ones(hidden_size)) self.variance_epsilon = eps def forward(self, hidden_states): input_dtype = hidden_states.dtype hidden_states = hidden_states.to(torch.float32) variance = hidden_states.pow(2).mean(-1, keepdim=True) hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) return self.weight * hidden_states.to(input_dtype) def extra_repr(self): return f"{tuple(self.weight.shape)}, eps={self.variance_epsilon}" class NeuroBLASTRotaryEmbedding(nn.Module): """ Rotary Positional Embedding for NeuroBLAST model. Source: LlamaRotaryEmbedding """ def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None): super().__init__() self.dim = dim self.max_position_embeddings = max_position_embeddings self.base = base inv_freq = 1.0 / ( self.base ** (torch.arange(0, self.dim, 2, dtype=torch.float32).to(device) / self.dim) ) self.register_buffer("inv_freq", inv_freq, persistent=False) self._set_cos_sin_cache( seq_len=max_position_embeddings, device="cpu", dtype=torch.float32 ) def _set_cos_sin_cache(self, seq_len, device, dtype): self.max_seq_len_cached = seq_len t = torch.arange( self.max_seq_len_cached, device=device, dtype=self.inv_freq.dtype ) freqs = torch.einsum("i,j->ij", t, self.inv_freq) emb = torch.cat((freqs, freqs), dim=-1) self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False) self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False) def forward(self, x, seq_len=None): if seq_len > self.max_seq_len_cached: self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype) elif self.cos_cached.device != x.device or self.cos_cached.dtype != x.dtype: self.cos_cached = self.cos_cached.to(device=x.device, dtype=x.dtype) self.sin_cached = self.sin_cached.to(device=x.device, dtype=x.dtype) return ( self.cos_cached[:seq_len], self.sin_cached[:seq_len], ) def rotate_half(x): """Rotates half the hidden dims of the input.""" x1 = x[..., : x.shape[-1] // 2] x2 = x[..., x.shape[-1] // 2 :] return torch.cat((-x2, x1), dim=-1) def apply_rotary_pos_emb(q, k, cos, sin, position_ids): """ Applies rotary positional embeddings to query and key tensors.""" cos = cos[position_ids].unsqueeze(1) # [bs, 1, seq_len, dim] sin = sin[position_ids].unsqueeze(1) # [bs, 1, seq_len, dim] q_embed = (q * cos) + (rotate_half(q) * sin) k_embed = (k * cos) + (rotate_half(k) * sin) return q_embed, k_embed # Overload for Cross Attention where only query is rotated def apply_rotary_pos_emb_single(q, cos, sin, position_ids): """ Applies rotary positional embeddings to query tensor. """ cos = cos[position_ids].unsqueeze(1) # [1, 1, seq_len, dim] sin = sin[position_ids].unsqueeze(1) # [1, 1, seq_len, dim] q_embed = (q * cos) + (rotate_half(q) * sin) return q_embed class SwiGLUMLP(nn.Module): """SwiGLU MLP block""" def __init__(self, hidden_size, config: NeuroBLASTConfig, dropout): super().__init__() intermediate_size = getattr(config, "intermediate_size", int(hidden_size * 2.5)) self.init_std = getattr(config, "initializer_range", 0.02) self.clamp_value = config.clamp_value self.gate_proj = nn.Linear(hidden_size, intermediate_size, bias=True) self.up_proj = nn.Linear(hidden_size, intermediate_size, bias=False) self.down_proj = nn.Linear(intermediate_size, hidden_size, bias=False) self.act_fn = nn.SiLU() self.dropout = nn.Dropout(dropout) if config.num_experts is not None: self.experts = NeuroBLASTSparseMoeBlock(config) # Re-enable scaled initialization with torch.no_grad(): # Scale down initial weights in the up/gate projections self.gate_proj.weight.data.normal_( mean=0.0, std=self.init_std / math.sqrt(hidden_size), # Scale by input dim ) if self.gate_proj.bias is not None: self.gate_proj.bias.data.zero_() self.up_proj.weight.data.normal_( mean=0.0, std=self.init_std / math.sqrt(hidden_size) ) # Scale by input dim # Scale down initial weights in the down projection even further self.down_proj.weight.data.normal_( mean=0.0, std=self.init_std / math.sqrt(intermediate_size), # Scale by intermediate dim ) def forward(self, x): gated_x = self.gate_proj(x) activated_x = self.act_fn(gated_x) up_projected_x = self.up_proj(x) intermediate_activation = activated_x * up_projected_x # Clamp the intermediate activation before down_proj clamp_value = self.clamp_value intermediate_activation = torch.clamp( intermediate_activation, min=-clamp_value, max=clamp_value ) intermediate_activation = torch.nan_to_num( intermediate_activation ) # Safeguard against NaNs y = self.down_proj(intermediate_activation) y = self.dropout(y) if hasattr(self, "experts"): z = self.experts(y) y = y + z return y class NeuroBLASTMoeMLP(nn.Module): """ Source: Qwen3MoeMLP """ def __init__(self, config, intermediate_size=None): super().__init__() self.config = config self.clamp_value = config.clamp_value self.hidden_size = config.hidden_size self.intermediate_size = ( intermediate_size if intermediate_size is not None else config.intermediate_size ) self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) self.act_fn = ACT2FN[config.hidden_act] def forward(self, x): down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) down_proj = torch.clamp(down_proj, min=-self.clamp_value, max=self.clamp_value) down_proj = torch.nan_to_num(down_proj) # Safeguard against NaNs return down_proj class NeuroBLASTSparseMoeBlock(nn.Module): """ Source: Qwen3SparseMoeBlock """ def __init__(self, config): super().__init__() self.num_experts = config.num_experts self.top_k = config.num_experts_per_tok self.norm_topk_prob = config.norm_topk_prob # gating self.gate = nn.Linear(config.hidden_size, config.num_experts, bias=False) self.experts = nn.ModuleList( [NeuroBLASTMoeMLP(config) for _ in range(self.num_experts)] ) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: """ """ batch_size, sequence_length, hidden_dim = hidden_states.shape hidden_states = hidden_states.view(-1, hidden_dim) # router_logits: (batch * sequence_length, n_experts) router_logits = self.gate(hidden_states) routing_weights = F.softmax(router_logits, dim=1, dtype=torch.float) routing_weights, selected_experts = torch.topk( routing_weights, self.top_k, dim=-1 ) if self.norm_topk_prob: # only diff with mixtral sparse moe block! routing_weights /= routing_weights.sum(dim=-1, keepdim=True) # we cast back to the input dtype routing_weights = routing_weights.to(hidden_states.dtype) final_hidden_states = torch.zeros( (batch_size * sequence_length, hidden_dim), dtype=hidden_states.dtype, device=hidden_states.device, ) # One hot encode the selected experts to create an expert mask # this will be used to easily index which expert is going to be sollicitated expert_mask = torch.nn.functional.one_hot( selected_experts, num_classes=self.num_experts ).permute(2, 1, 0) # Loop over all available experts in the model and perform the computation on each expert for expert_idx in range(self.num_experts): expert_layer = self.experts[expert_idx] idx, top_x = torch.where(expert_mask[expert_idx]) # Index the correct hidden states and compute the expert hidden state for # the current expert. We need to make sure to multiply the output hidden # states by `routing_weights` on the corresponding tokens (top-1 and top-2) current_state = hidden_states[None, top_x].reshape(-1, hidden_dim) current_hidden_states = ( expert_layer(current_state) * routing_weights[top_x, idx, None] ) # However `index_add_` only support torch tensors for indexing so we'll use # the `top_x` tensor here. final_hidden_states.index_add_( 0, top_x, current_hidden_states.to(hidden_states.dtype) ) final_hidden_states = final_hidden_states.reshape( batch_size, sequence_length, hidden_dim ) return final_hidden_states class NeuroBLASTRouterBlock(nn.Module): """ Memory router; overcomplicated due to backward compatibility """ def __init__( self, config, hidden_size, ): super().__init__() self.num_experts = 2 self.top_k = 1 self.norm_topk_prob = config.norm_topk_prob # gating self.gate = nn.Linear(hidden_size, self.num_experts, bias=False) def forward(self, hidden_states: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: """ """ batch_size, sequence_length, hidden_dim = hidden_states.shape hidden_states = hidden_states.view(-1, hidden_dim) # router_logits: (batch * sequence_length, n_experts) router_logits = self.gate(hidden_states) routing_weights = F.softmax(router_logits, dim=1, dtype=torch.float) routing_weights, selected_experts = torch.topk( routing_weights, self.top_k, dim=-1 ) if self.norm_topk_prob: # only diff with mixtral sparse moe block! routing_weights /= routing_weights.sum(dim=-1, keepdim=True) # we cast back to the input dtype routing_weights = routing_weights.to(hidden_states.dtype) return routing_weights, selected_experts class SelfAttention(torch.nn.Module): def __init__( self, config: NeuroBLASTConfig, hidden_size: int, is_causal: bool = False, layer_idx: Optional[int] = None, ): super().__init__() self.is_causal = is_causal self.dropout_p = config.dropout # Will apply based on self.training self.layer_idx = layer_idx self.hidden_size = hidden_size self.intermediate_size = config.kv_dim self.use_flash_attn = config.use_flash_attn # Allow overriding num_heads, default to config.num_attention_heads self.num_heads = getattr( config, f"num_heads_{layer_idx}", config.num_attention_heads ) self.head_dim = self.intermediate_size // self.num_heads if (self.head_dim * self.num_heads) != self.intermediate_size: raise ValueError( f"Layer {self.layer_idx}: hidden_size ({self.intermediate_size}) must be divisible by num_heads ({self.num_heads})" ) self.qkv_proj = nn.Linear( self.hidden_size, self.intermediate_size * 3, bias=True ) self.o_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) self.dropout = nn.Dropout(config.dropout) def forward( self, hidden_states: torch.Tensor, position_embeddings: Tuple[torch.Tensor, torch.Tensor], # (cos, sin) attention_mask: Optional[ torch.Tensor ], # Not directly used by flash_attn causal past_key_value: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, output_attentions: Optional[bool] = False, use_cache: Optional[bool] = False, position_ids: Optional[torch.LongTensor] = None, ): batch_size, seq_len, _ = hidden_states.shape dropout_p = self.dropout_p if self.training else 0.0 qkv = self.qkv_proj(hidden_states) query_states, key_states, value_states = qkv.chunk(3, dim=-1) query_states = query_states.view( batch_size, seq_len, self.num_heads, self.head_dim ).transpose(1, 2) key_states = key_states.view( batch_size, seq_len, self.num_heads, self.head_dim ).transpose(1, 2) value_states = value_states.view( batch_size, seq_len, self.num_heads, self.head_dim ).transpose(1, 2) cos, sin = position_embeddings query_states, key_states = apply_rotary_pos_emb( query_states, key_states, cos, sin, position_ids ) if past_key_value is not None: cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} # When using Cache object, updating happens in-place. key_states, value_states = past_key_value.update( key_states, value_states, self.layer_idx, cache_kwargs ) if self.use_flash_attn: causal_mask = attention_mask if attention_mask is not None: if attention_mask.dim() == 4: causal_mask = attention_mask[:, :, :, : key_states.shape[-2]] elif attention_mask.dim() == 2: causal_mask = attention_mask if causal_mask.dtype not in [ torch.bool, torch.float16, torch.float32, torch.bfloat16, ]: causal_mask = causal_mask.to(query_states.dtype) is_causal = ( True if causal_mask is None and query_states.shape[-2] > 1 else False ) attn_output = F.scaled_dot_product_attention( query_states, key_states, value_states, attn_mask=causal_mask, dropout_p=dropout_p, enable_gqa=False, scale=self.head_dim**-0.5, is_causal=is_causal, ) attn_weights = None else: attn_weights = torch.matmul(query_states, key_states.transpose(-1, -2)) / ( self.head_dim**0.5 ) if attention_mask is not None: attn_weights = attn_weights + attention_mask elif self.is_causal and seq_len > 1: causal_mask = torch.triu( torch.ones( (seq_len, key_states.shape[2]), dtype=torch.bool, device=query_states.device, ), diagonal=1, ) attn_weights = attn_weights.masked_fill( causal_mask.unsqueeze(0).unsqueeze(0), float("-inf") ) attn_weights = F.softmax(attn_weights, dim=-1) attn_weights_dropped = F.dropout( attn_weights, p=dropout_p, training=self.training ) attn_output = torch.matmul(attn_weights_dropped, value_states) attn_output = attn_output.transpose(1, 2).contiguous() attn_output = attn_output.view(batch_size, seq_len, -1) attn_output = self.o_proj(attn_output) attn_output = self.dropout(attn_output) if output_attentions: logger.warning( f"Layer {self.layer_idx}: Flash Attention does not return attention weights." ) outputs = (attn_output,) if output_attentions: outputs += (attn_weights,) if use_cache: outputs += (past_key_value,) # Return the cache object return outputs class CrossAttention(torch.nn.Module): def __init__( self, config: NeuroBLASTConfig, query_dim: int, kv_dim: int, layer_idx: int, is_causal: bool = True, ): super().__init__() self.dropout_p = config.dropout self.layer_idx = layer_idx self.query_dim = query_dim self.kv_dim = kv_dim self.is_causal = is_causal self.num_heads = config.num_attention_heads self.head_dim = self.kv_dim // self.num_heads self.kv_head_dim = ( self.kv_dim // self.num_heads ) if (self.head_dim * self.num_heads) != self.kv_dim: raise ValueError( f"CrossAttn {layer_idx}: query_dim ({self.kv_dim}) must be divisible by num_heads ({self.num_heads})" ) if (self.kv_head_dim * self.num_heads) != self.kv_dim: raise ValueError( f"CrossAttn {layer_idx}: kv_dim ({kv_dim}) must be divisible by num_heads ({self.num_heads})" ) self.q_proj = nn.Linear(self.query_dim, self.kv_dim, bias=True) self.k_proj = nn.Linear(self.query_dim, self.kv_dim, bias=True) self.v_proj = nn.Linear(self.query_dim, self.kv_dim, bias=True) self.o_proj = nn.Linear(self.kv_dim, self.query_dim, bias=False) self.dropout = nn.Dropout(config.dropout) self.use_flash_attn = hasattr( F, "scaled_dot_product_attention" ) def forward( self, query_states: torch.Tensor, kv_states: torch.Tensor, position_embeddings: Tuple[ torch.Tensor, torch.Tensor ], past_key_value: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = False, position_ids: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = False, ): batch_size, q_seq_len, _ = query_states.shape kv_seq_len = kv_states.shape[1] dropout_p = self.dropout_p if self.training else 0.0 query = self.q_proj(query_states) key = self.k_proj(kv_states) value = self.v_proj(kv_states) query = query.view( batch_size, q_seq_len, self.num_heads, self.head_dim ).transpose(1, 2) cos, sin = position_embeddings query = apply_rotary_pos_emb_single(query, cos, sin, position_ids) if past_key_value is not None: cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} key, value = past_key_value.update( key, value, self.layer_idx or 0, cache_kwargs ) kv_seq_len = key.shape[1] key = key.view( batch_size, kv_seq_len, self.num_heads, self.kv_head_dim ).transpose(1, 2) value = value.view( batch_size, kv_seq_len, self.num_heads, self.kv_head_dim ).transpose(1, 2) sdpa_attn_mask = attention_mask if self.use_flash_attn: is_causal = True if sdpa_attn_mask is None and q_seq_len > 1 else False attn_output = F.scaled_dot_product_attention( query, key, value, attn_mask=sdpa_attn_mask if not is_causal else None, dropout_p=dropout_p, is_causal=is_causal, enable_gqa=False, scale=self.head_dim**-0.5, ) attn_weights = None else: attn_weights = torch.matmul(query, key.transpose(-1, -2)) / ( self.head_dim**0.5 ) if sdpa_attn_mask is not None: attn_weights = attn_weights + sdpa_attn_mask elif self.is_causal and q_seq_len > 1: causal_mask = torch.triu( torch.ones( (q_seq_len, kv_seq_len), dtype=torch.bool, device=query.device ), diagonal=1, ) attn_weights = attn_weights.masked_fill( causal_mask.unsqueeze(0).unsqueeze(0), float("-inf") ) attn_weights = F.softmax(attn_weights, dim=-1) attn_weights_dropped = F.dropout( attn_weights, p=dropout_p, training=self.training ) attn_output = torch.matmul(attn_weights_dropped, value) attn_output = attn_output.transpose( 1, 2 ).contiguous() attn_output = attn_output.reshape(batch_size, q_seq_len, self.kv_dim) attn_output = self.o_proj(attn_output) attn_output = self.dropout(attn_output) outputs = (attn_output,) if output_attentions: outputs += (attn_weights,) if use_cache: outputs += (past_key_value,) return outputs class AttentionBlock(torch.nn.Module): """Modified Attention Block with Pre-Norm and choice of Self/Cross Attention & MLP""" def __init__( self, config: NeuroBLASTConfig, hidden_size: int, attention_module: nn.Module, mlp_module: nn.Module, is_cross_attention: bool = False, layer_idx: int = 0, precomputed_total_layers: Optional[int] = None, ): super().__init__() self.hidden_size = hidden_size self.config = config self.layer_idx = layer_idx self.is_cross_attention = is_cross_attention self.input_layernorm = nn.LayerNorm( hidden_size, eps=getattr(config, "rms_norm_eps", 1e-5) ) self.attention = attention_module self.post_attention_layernorm = nn.LayerNorm( hidden_size, eps=getattr(config, "rms_norm_eps", 1e-5) ) self.mlp = mlp_module if self.config.use_zero_memory and ( self.config.zero_memory_layers is None or self.layer_idx in self.config.zero_memory_layers ): self.router = NeuroBLASTRouterBlock(config, hidden_size) self.memory = NeuroBLASTMemory( config, hidden_size=hidden_size, layer_idx=layer_idx, precomputed_total_layers=precomputed_total_layers, ) def forward( self, hidden_states: torch.Tensor, position_embeddings: Tuple[torch.Tensor, torch.Tensor], attention_mask: Optional[torch.Tensor], position_ids: Optional[torch.LongTensor], kv_states: Optional[torch.Tensor] = None, past_key_value: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, output_attentions: Optional[bool] = False, use_cache: Optional[bool] = False, previous_states: Optional[Tuple[torch.Tensor, torch.Tensor]] = None, ): residual = hidden_states hidden_states = torch.nan_to_num(hidden_states) normed_hidden_states = self.input_layernorm(hidden_states) if self.is_cross_attention: if kv_states is None: raise ValueError("kv_states must be provided for CrossAttention") attn_outputs = self.attention( query_states=normed_hidden_states, kv_states=kv_states, past_key_value=past_key_value, cache_position=cache_position, position_embeddings=position_embeddings, attention_mask=attention_mask, output_attentions=output_attentions, position_ids=position_ids, use_cache=use_cache, ) else: attn_outputs = self.attention( normed_hidden_states, position_embeddings=position_embeddings, attention_mask=attention_mask, past_key_value=past_key_value, cache_position=cache_position, output_attentions=output_attentions, use_cache=use_cache, position_ids=position_ids, ) attn_output = attn_outputs[0] past_key_value = attn_outputs[-1] if use_cache else None hidden_states = residual + attn_output hidden_states = torch.nan_to_num(hidden_states) residual = hidden_states normed_hidden_states = self.post_attention_layernorm(hidden_states) mlp_output = self.mlp(normed_hidden_states) hidden_states = residual + mlp_output hidden_states = torch.nan_to_num(hidden_states) if self.config.use_zero_memory and ( self.config.zero_memory_layers is None or self.layer_idx in self.config.zero_memory_layers ): routing_weights, selected_experts = self.router(hidden_states) residual = hidden_states hidden_states, (hx, cx), past_key_value = self.memory( hidden_states, previous_states, past_key_value=past_key_value, cache_position=cache_position, position_embeddings=position_embeddings, attention_mask=attention_mask, output_attentions=output_attentions, position_ids=position_ids, use_cache=use_cache, ) hidden_states = torch.nan_to_num(hidden_states) hx = torch.nan_to_num(hx) cx = torch.nan_to_num(cx) hidden_states = hidden_states * routing_weights.reshape( hidden_states.shape[:-1] ).unsqueeze(-1) hidden_states = residual + self.config.zero_memory_alpha * hidden_states hidden_states = torch.nan_to_num(hidden_states) outputs = (hidden_states,) + attn_outputs[1:] if self.config.use_zero_memory and ( self.config.zero_memory_layers is None or self.layer_idx in self.config.zero_memory_layers ): outputs += ((hx, cx),) else: outputs += (previous_states,) if use_cache: outputs += (past_key_value,) return outputs class NeuroBLASTMemory(nn.Module): def __init__( self, config: NeuroBLASTConfig, hidden_size: int = 256, num_heads: int = 4, scale_factor: int = 4, layer_idx: int = 0, with_hx: bool = True, precomputed_total_layers: Optional[int] = None, *args, **kwargs, ): super().__init__(*args, **kwargs) self.hidden_size = hidden_size self.num_heads = num_heads self.scale_factor = scale_factor self.clamp_value = config.clamp_value # Use precomputed_total_layers instead of hardcoded 100 for layer index shift layer_shift = ( precomputed_total_layers if precomputed_total_layers is not None else 100 ) self.layer_idx = layer_idx + layer_shift self.with_hx = with_hx self.kv_dim = ( config.kv_dim ) self.scaled_dim = hidden_size * scale_factor self.head_dim = self.kv_dim // config.num_attention_heads self.num_heads = self.hidden_size // self.head_dim self.norm1 = nn.LayerNorm(hidden_size) if self.with_hx: self.lin1 = nn.Linear(self.hidden_size, self.scaled_dim) self.lin2 = nn.Linear(self.hidden_size, self.scaled_dim) self.lin3 = nn.Linear(self.scaled_dim, self.hidden_size) self.lin4 = nn.Linear(self.hidden_size, self.scaled_dim) self.lin5 = nn.Linear(self.scaled_dim, self.hidden_size) if self.with_hx: self.lin6 = nn.Linear(self.scaled_dim, self.hidden_size) self.gate1 = nn.Linear(self.scaled_dim, self.scaled_dim) self.act1 = nn.SiLU() self.gate2 = nn.Linear(self.scaled_dim, self.scaled_dim) self.act2 = nn.SiLU() self.last_token_reg = nn.Linear(self.hidden_size, self.hidden_size, bias=True) self.prev_tokens_reg = nn.Linear(self.hidden_size, self.hidden_size, bias=True) self.norm2 = nn.LayerNorm(hidden_size) self.dropout = nn.Dropout(config.dropout) def forward( self, x: torch.Tensor, previous_state: tuple[torch.Tensor, torch.Tensor], position_embeddings: Tuple[torch.Tensor, torch.Tensor], attention_mask: Optional[torch.Tensor], position_ids: Optional[torch.LongTensor], past_key_value: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, output_attentions: Optional[bool] = False, use_cache: Optional[bool] = False, ): hx, cx = previous_state b, s, d = x.size() x = torch.nan_to_num(x) norm_x = self.norm1(x) norm_x = norm_x.view(b, s, self.num_heads, self.head_dim).transpose(1, 2) cos, sin = position_embeddings norm_x = apply_rotary_pos_emb_single(norm_x, cos, sin, position_ids) kv_seq_len = s if past_key_value is not None: cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} norm_x, _ = past_key_value.update( norm_x, torch.zeros((b, 1, kv_seq_len, d)), self.layer_idx, cache_kwargs, ) kv_seq_len = norm_x.shape[2] norm_x = norm_x.transpose(1, 2).contiguous() norm_x = norm_x.view(b, kv_seq_len, d) norm_x = torch.nan_to_num(norm_x) expanded_x = None shifted_x = torch.cat( [ torch.zeros((b, 1, d), device=x.device, dtype=x.dtype), (norm_x[:, :-1].contiguous()), ], dim=1, ).contiguous() shifted_x = torch.nan_to_num(shifted_x) # Replace NaNs with zeros prev_tokens_x = norm_x.cumsum(dim=1) prev_tokens_x = prev_tokens_x - shifted_x prev_tokens_x = torch.nan_to_num(prev_tokens_x)[:, -s:].contiguous() if self.with_hx: expanded_x = self.lin1(norm_x[:, -s:].contiguous()) expanded_x = torch.nan_to_num(expanded_x) expanded_shifted_x = self.lin2(shifted_x[:, -s:].contiguous()) expanded_shifted_x = torch.nan_to_num(expanded_shifted_x) gated_shifted_x = self.gate1(expanded_shifted_x) gated_shifted_x = self.act1(gated_shifted_x) gated_shifted_x = torch.clamp( gated_shifted_x, min=-self.clamp_value, max=self.clamp_value ) gated_shifted_x = torch.nan_to_num(gated_shifted_x) collapsed_shifted_x = self.lin3(gated_shifted_x) collapsed_shifted_x = torch.nan_to_num(collapsed_shifted_x) prev_tokens_x = torch.nan_to_num(prev_tokens_x) expanded_prev_tokens_x = self.lin4(prev_tokens_x) expanded_prev_tokens_x = torch.nan_to_num(expanded_prev_tokens_x) gated_prev_tokens_x = self.gate2(expanded_prev_tokens_x) gated_prev_tokens_x = self.act2(gated_prev_tokens_x) gated_prev_tokens_x = torch.clamp( gated_prev_tokens_x, min=-self.clamp_value, max=self.clamp_value ) gated_prev_tokens_x = torch.nan_to_num(gated_prev_tokens_x) collapsed_prev_tokens_x = self.lin5(gated_prev_tokens_x) collapsed_prev_tokens_x = torch.nan_to_num(collapsed_prev_tokens_x) if self.with_hx: weights = torch.softmax(expanded_x * expanded_shifted_x, dim=-1) expanded_x_attn = weights * expanded_x expanded_x_attn = torch.nan_to_num(expanded_x_attn) hx = hx + self.lin6(expanded_x_attn) hx = torch.nan_to_num(hx) if self.with_hx: x = torch.nan_to_num(x) hx = torch.nan_to_num(hx) collapsed_shifted_x = torch.nan_to_num(collapsed_shifted_x) collapsed_prev_tokens_x = torch.nan_to_num(collapsed_prev_tokens_x) output = x + ( hx * ( self.last_token_reg(collapsed_shifted_x) + self.prev_tokens_reg(collapsed_prev_tokens_x) ) ) output = torch.nan_to_num(output) else: output = ( x + ( self.last_token_reg(collapsed_shifted_x) + self.prev_tokens_reg(collapsed_prev_tokens_x) )[:, -s:].contiguous() ) output = self.norm2(output) output = self.dropout(output) output = torch.nan_to_num(output) return ( output, (hx, cx), past_key_value, ) class NeuroBLASTPreTrainedModel(PreTrainedModel): config_class = NeuroBLASTConfig base_model_prefix = "brain" supports_gradient_checkpointing = True _no_split_modules = [] _keys_to_ignore_on_load_unexpected = [r"decoder\.version"] _supports_flash_attn_2 = False _supports_sdpa = True def _init_weights(self, module): std = getattr(self.config, "initializer_range", 0.02) if isinstance(module, nn.Linear): module.weight.data.normal_(mean=0.0, std=std) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.Embedding): module.weight.data.normal_(mean=0.0, std=std) if module.padding_idx is not None: module.weight.data[module.padding_idx].zero_() elif isinstance(module, nn.LayerNorm): if module.bias is not None: module.bias.data.zero_() module.weight.data.fill_(1.0) class NeuroBLASTModel(NeuroBLASTPreTrainedModel): def __init__(self, config: NeuroBLASTConfig): super(NeuroBLASTModel, self).__init__(config) self.config = config self.padding_idx = config.pad_token_id self.vocab_size = config.vocab_size self.embed_tokens = nn.Embedding( config.vocab_size, config.hidden_size, # Using main hidden_size for embeddings now padding_idx=self.padding_idx, ) self.rotary_emb = NeuroBLASTRotaryEmbedding( config.kv_dim // config.num_attention_heads, max_position_embeddings=config.max_position_embeddings, base=config.rope_theta, ) self.dropout = nn.Dropout(config.dropout) self.assoc_to_sensory_pooler = nn.Sequential( nn.Linear(config.hidden_size, config.hidden_size), nn.Identity(), # Backward compatibility - previously LayerNorm, but we found that removing it improve generalization nn.GELU(), nn.LayerNorm(config.hidden_size), ) self.assoc_to_motor_pooler = nn.Sequential( nn.Linear(config.hidden_size, config.hidden_size), nn.Identity(), # Backward compatibility nn.GELU(), nn.LayerNorm(config.hidden_size), ) self.sensory_to_motor_pooler = nn.Sequential( nn.Linear(config.hidden_size, config.hidden_size), nn.Identity(), # Backward compatibility nn.GELU(), nn.LayerNorm(config.hidden_size), ) # --- Cortex Layers --- # Using generic AttentionBlock with specific Self/Cross Attention modules passed in total_layers = 0 # Precompute total layers for Memory layer indexing before creating any layers precomputed_total_layers = ( config.num_association_cortex_layers + config.num_sensory_cortex_layers * 2 + config.num_motor_cortex_layers * 3 ) self.precomputed_total_layers = precomputed_total_layers config.precomputed_total_layers = precomputed_total_layers # 1. Association Cortex (Self-Attention) self.association_cortex = nn.ModuleList() for i in range(config.num_association_cortex_layers): layer_idx = total_layers + i print(f"Adding layer {layer_idx} to association cortex") self.association_cortex.append( AttentionBlock( config, config.hidden_size, # Use main hidden_size attention_module=SelfAttention( config, config.hidden_size, is_causal=True, layer_idx=layer_idx ), mlp_module=( NeuroBLASTSparseMoeBlock( config, ) if config.num_experts else NeuroBLASTMoeMLP(config) ), is_cross_attention=False, layer_idx=layer_idx, precomputed_total_layers=precomputed_total_layers, ) ) total_layers += config.num_association_cortex_layers # 2. Sensory Cortex (Self-Attention + Cross-Attention to Association) self.sensory_self_attn_layers = nn.ModuleList() self.sensory_cross_attn_layers = ( nn.ModuleList() ) # One cross-attn per self-attn layer for i in range(config.num_sensory_cortex_layers): layer_idx = total_layers + i print(f"Adding layer {layer_idx} to sensory cortex") self.sensory_self_attn_layers.append( AttentionBlock( config, config.hidden_size, attention_module=SelfAttention( config, config.hidden_size, is_causal=True, layer_idx=layer_idx, ), mlp_module=( NeuroBLASTSparseMoeBlock( config, ) if config.num_experts else NeuroBLASTMoeMLP(config) ), is_cross_attention=False, layer_idx=layer_idx, precomputed_total_layers=precomputed_total_layers, ) ) total_layers += config.num_sensory_cortex_layers for i in range(config.num_sensory_cortex_layers): layer_idx = total_layers + i print(f"Adding layer {layer_idx} to sensory cross-attention") # Add Cross-Attention layer: Sensory queries, Association is K/V source self.sensory_cross_attn_layers.append( AttentionBlock( config, config.hidden_size, # Query Dim attention_module=CrossAttention( config, query_dim=config.hidden_size, kv_dim=config.kv_dim, # Assoc output dim layer_idx=layer_idx, ), mlp_module=( NeuroBLASTSparseMoeBlock( config, ) if config.num_experts else NeuroBLASTMoeMLP(config) ), is_cross_attention=True, layer_idx=layer_idx, precomputed_total_layers=precomputed_total_layers, ) ) total_layers += config.num_sensory_cortex_layers # 3. Motor Cortex (Self-Attention + Cross-Attention to Sensory + Cross-Attention to Association) self.motor_self_attn_layers = nn.ModuleList() self.motor_cross_sensory_layers = nn.ModuleList() self.motor_cross_assoc_layers = nn.ModuleList() for i in range(config.num_motor_cortex_layers): layer_idx = total_layers + i print(f"Adding layer {layer_idx} to motor cortex") self.motor_self_attn_layers.append( AttentionBlock( config, config.hidden_size, attention_module=SelfAttention( config, config.hidden_size, is_causal=True, layer_idx=layer_idx, ), mlp_module=( NeuroBLASTSparseMoeBlock( config, ) if config.num_experts else NeuroBLASTMoeMLP(config) ), is_cross_attention=False, layer_idx=layer_idx, precomputed_total_layers=precomputed_total_layers, ) ) total_layers += config.num_motor_cortex_layers for i in range(config.num_motor_cortex_layers): layer_idx = total_layers + i print(f"Adding layer {layer_idx} to motor cross-sensory") # Cross-Attend to Sensory Output self.motor_cross_sensory_layers.append( AttentionBlock( config, config.hidden_size, # Query Dim attention_module=CrossAttention( config, query_dim=config.hidden_size, kv_dim=config.kv_dim, # Sensory output dim layer_idx=layer_idx, ), mlp_module=( NeuroBLASTSparseMoeBlock( config, ) if config.num_experts else NeuroBLASTMoeMLP(config) ), is_cross_attention=True, layer_idx=layer_idx, precomputed_total_layers=precomputed_total_layers, ) ) total_layers += config.num_motor_cortex_layers for i in range(config.num_motor_cortex_layers): layer_idx = total_layers + i print(f"Adding layer {layer_idx} to motor cross-association") # Cross-Attend to Association Output self.motor_cross_assoc_layers.append( AttentionBlock( config, config.hidden_size, # Query Dim attention_module=CrossAttention( config, query_dim=config.hidden_size, kv_dim=config.kv_dim, # Assoc output dim layer_idx=layer_idx, ), mlp_module=( NeuroBLASTSparseMoeBlock( config, ) if config.num_experts else NeuroBLASTMoeMLP(config) ), is_cross_attention=True, layer_idx=layer_idx, precomputed_total_layers=precomputed_total_layers, ) ) total_layers += config.num_motor_cortex_layers # Initialize more conservatively to prevent strong gradient flow initially self.sensory_cross_assoc_gate = NeuroBLASTMoeMLP( config, ) self.motor_cross_sensory_gate = NeuroBLASTMoeMLP( config, ) self.motor_cross_assoc_gate = NeuroBLASTMoeMLP( config, ) # Final normalization before output head self.norm = nn.LayerNorm( config.hidden_size, eps=getattr(config, "rms_norm_eps", 1e-5) ) self.gradient_checkpointing = False self.post_init() def forward( self, input_ids: torch.LongTensor, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[ Cache ] = None, inputs_embeds: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, cache_position: Optional[torch.LongTensor] = None, ): output_attentions = ( output_attentions if output_attentions is not None else self.config.output_attentions ) output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) use_cache = use_cache if use_cache is not None else self.config.use_cache # use_cache = False return_dict = ( return_dict if return_dict is not None else self.config.use_return_dict ) if input_ids is not None and inputs_embeds is not None: raise ValueError("Specify either input_ids or inputs_embeds") batch_size, seq_length = ( input_ids.shape if input_ids is not None else inputs_embeds.shape[:2] ) if self.gradient_checkpointing and self.training: if use_cache: logger.warning_once( "`use_cache=True` incompatible with gradient checkpointing. Setting `use_cache=False`" ) use_cache = False if not any(param.requires_grad for param in self.parameters()): logger.warning_once( "No parameters require gradients. Disabling gradient checkpointing to avoid warnings." ) self.gradient_checkpointing = False if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) past_key_values_length = 0 if use_cache: if not isinstance(past_key_values, Cache): past_key_values = DynamicCache.from_legacy_cache(past_key_values) past_key_values_length = past_key_values.get_seq_length() if cache_position is None: past_seen_tokens = ( past_key_values.get_seq_length() if past_key_values is not None else 0 ) cache_position = torch.arange( past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device, ) if position_ids is None: position_ids = cache_position.unsqueeze(0) causal_mask = self._update_causal_mask( attention_mask, inputs_embeds, cache_position, past_key_values, output_attentions, ) hidden_states = inputs_embeds cos, sin = self.rotary_emb( hidden_states, seq_len=seq_length + past_key_values_length ) position_embeddings = (cos, sin) all_hidden_states = () if output_hidden_states else None all_attentions = ( () if output_attentions else None ) next_decoder_cache = ( past_key_values if use_cache else None ) if self.config.use_zero_memory: hx = torch.ones( (batch_size, seq_length, hidden_states.size(-1)), device=hidden_states.device, dtype=hidden_states.dtype, ) cx = torch.ones( (batch_size, seq_length, hidden_states.size(-1)), device=hidden_states.device, dtype=hidden_states.dtype, ) if self.training: hx.requires_grad_() cx.requires_grad_() else: hx = None cx = None # 1. Association Cortex (Self-Attention) assoc_output = hidden_states for i, layer in enumerate(self.association_cortex): if output_hidden_states: all_hidden_states += (assoc_output,) if self.gradient_checkpointing and self.training: outputs = torch.utils.checkpoint.checkpoint( layer, assoc_output, position_embeddings, causal_mask, position_ids, None, next_decoder_cache, cache_position, output_attentions, use_cache, (hx, cx), ) else: outputs = layer( assoc_output, position_embeddings, causal_mask, position_ids, kv_states=None, past_key_value=next_decoder_cache, cache_position=cache_position, output_attentions=output_attentions, use_cache=use_cache, previous_states=(hx, cx), ) assoc_output = outputs[0] hx, cx = outputs[-1 if not use_cache else -2] if output_attentions: all_attentions += (outputs[1],) if use_cache: next_decoder_cache = outputs[-1] else: next_decoder_cache = None sensory_state = self.assoc_to_sensory_pooler(assoc_output) sensory_state = apply_gradient_scaling( sensory_state, self.config.association_gradient_scale, self.config.gradient_scaling_enabled, ) # 2. Sensory Cortex (Self-Attention + Cross-Attention to Association) for i in range(self.config.num_sensory_cortex_layers): if output_hidden_states: all_hidden_states += (sensory_state,) self_attn_layer = self.sensory_self_attn_layers[i] if self.gradient_checkpointing and self.training: outputs_self = torch.utils.checkpoint.checkpoint( self_attn_layer, sensory_state, position_embeddings, causal_mask, position_ids, None, next_decoder_cache, cache_position, output_attentions, use_cache, (hx, cx), ) else: outputs_self = self_attn_layer( sensory_state, position_embeddings, causal_mask, position_ids, kv_states=None, past_key_value=next_decoder_cache, cache_position=cache_position, output_attentions=output_attentions, use_cache=use_cache, previous_states=(hx, cx), ) sensory_state = outputs_self[0] hx, cx = outputs_self[-1 if not use_cache else -2] if output_attentions: all_attentions += (outputs_self[1],) if use_cache: next_decoder_cache = outputs_self[-1] else: next_decoder_cache = None cross_attn_layer = self.sensory_cross_attn_layers[i] cross_attn_causal_mask = None if causal_mask is not None: q_seq_len = sensory_state.size(1) kv_seq_len = assoc_output.size(1) cross_attn_causal_mask = torch.ones( (batch_size, 1, q_seq_len, kv_seq_len), device=hidden_states.device, dtype=hidden_states.dtype, ) causal_mask_upper = torch.triu( torch.ones((q_seq_len, kv_seq_len), device=hidden_states.device), diagonal=1, ) cross_attn_causal_mask = cross_attn_causal_mask.masked_fill( causal_mask_upper.unsqueeze(0).unsqueeze(0).bool(), torch.finfo(hidden_states.dtype).min, ) if self.gradient_checkpointing and self.training: outputs_cross = torch.utils.checkpoint.checkpoint( cross_attn_layer, sensory_state, position_embeddings, cross_attn_causal_mask, position_ids, assoc_output, next_decoder_cache, cache_position, output_attentions, use_cache, (hx, cx), ) else: outputs_cross = cross_attn_layer( sensory_state, position_embeddings, past_key_value=next_decoder_cache, cache_position=cache_position, attention_mask=cross_attn_causal_mask, position_ids=position_ids, kv_states=apply_gradient_scaling( assoc_output, self.config.cross_attention_gradient_scale, self.config.gradient_scaling_enabled, ), output_attentions=output_attentions, use_cache=use_cache, previous_states=(hx, cx), ) cross_contribution = nn.functional.layer_norm( outputs_cross[0], normalized_shape=(self.config.hidden_size,), eps=getattr(self.config, "rms_norm_eps", 1e-5), ) sensory_state = sensory_state + self.sensory_cross_assoc_gate( cross_contribution ) hx, cx = outputs_cross[-1 if not use_cache else -2] if output_attentions: all_attentions += (outputs_cross[1],) if use_cache: next_decoder_cache = outputs_cross[-1] else: next_decoder_cache = None motor_state = self.sensory_to_motor_pooler(sensory_state) motor_state = apply_gradient_scaling( motor_state, self.config.sensory_gradient_scale, self.config.gradient_scaling_enabled, ) motor_state_from_assoc = self.assoc_to_motor_pooler(assoc_output) motor_state_from_assoc = apply_gradient_scaling( motor_state_from_assoc, self.config.association_gradient_scale, self.config.gradient_scaling_enabled, ) motor_state = motor_state + motor_state_from_assoc # Combine pooled inputs # 3. Motor Cortex (Self + Cross-Sensory + Cross-Association) for i in range(self.config.num_motor_cortex_layers): if output_hidden_states: all_hidden_states += (motor_state,) self_attn_layer = self.motor_self_attn_layers[i] if self.gradient_checkpointing and self.training: outputs_self = torch.utils.checkpoint.checkpoint( self_attn_layer, motor_state, position_embeddings, causal_mask, position_ids, None, next_decoder_cache, cache_position, output_attentions, use_cache, (hx, cx), ) else: outputs_self = self_attn_layer( motor_state, position_embeddings, causal_mask, position_ids, kv_states=None, past_key_value=next_decoder_cache, cache_position=cache_position, output_attentions=output_attentions, use_cache=use_cache, previous_states=(hx, cx), ) motor_state = outputs_self[0] hx, cx = outputs_self[-1 if not use_cache else -2] if output_attentions: all_attentions += (outputs_self[1],) if use_cache: next_decoder_cache = outputs_self[-1] else: next_decoder_cache = None cross_sensory_layer = self.motor_cross_sensory_layers[i] motor_cross_sensory_mask = None if causal_mask is not None: motor_q_seq_len = motor_state.size(1) sensory_kv_seq_len = sensory_state.size( 1 ) motor_cross_sensory_mask = torch.ones( (batch_size, 1, motor_q_seq_len, sensory_kv_seq_len), device=hidden_states.device, dtype=hidden_states.dtype, ) causal_mask_upper = torch.triu( torch.ones( (motor_q_seq_len, sensory_kv_seq_len), device=hidden_states.device, ), diagonal=1, ) motor_cross_sensory_mask = motor_cross_sensory_mask.masked_fill( causal_mask_upper.unsqueeze(0).unsqueeze(0).bool(), torch.finfo(hidden_states.dtype).min, ) if self.gradient_checkpointing and self.training: outputs_cross_sensory = torch.utils.checkpoint.checkpoint( cross_sensory_layer, motor_state, position_embeddings, motor_cross_sensory_mask, position_ids, sensory_state, next_decoder_cache, cache_position, output_attentions, use_cache, (hx, cx), ) else: outputs_cross_sensory = cross_sensory_layer( motor_state, position_embeddings, attention_mask=motor_cross_sensory_mask, position_ids=position_ids, kv_states=apply_gradient_scaling( sensory_state, self.config.cross_attention_gradient_scale, self.config.gradient_scaling_enabled, ), output_attentions=output_attentions, past_key_value=next_decoder_cache, cache_position=cache_position, use_cache=use_cache, previous_states=(hx, cx), ) motor_state = motor_state + self.motor_cross_sensory_gate( outputs_cross_sensory[0] ) hx, cx = outputs_cross_sensory[-1 if not use_cache else -2] if output_attentions: all_attentions += (outputs_cross_sensory[1],) if use_cache: next_decoder_cache = outputs_cross_sensory[-1] else: next_decoder_cache = None cross_assoc_layer = self.motor_cross_assoc_layers[i] motor_cross_assoc_mask = None if causal_mask is not None: motor_q_seq_len = motor_state.size(1) assoc_kv_seq_len = assoc_output.size( 1 ) motor_cross_assoc_mask = torch.ones( (batch_size, 1, motor_q_seq_len, assoc_kv_seq_len), device=hidden_states.device, dtype=hidden_states.dtype, ) causal_mask_upper = torch.triu( torch.ones( (motor_q_seq_len, assoc_kv_seq_len), device=hidden_states.device ), diagonal=1, ) motor_cross_assoc_mask = motor_cross_assoc_mask.masked_fill( causal_mask_upper.unsqueeze(0).unsqueeze(0).bool(), torch.finfo(hidden_states.dtype).min, ) if self.gradient_checkpointing and self.training: outputs_cross_assoc = torch.utils.checkpoint.checkpoint( cross_assoc_layer, motor_state, position_embeddings, motor_cross_assoc_mask, position_ids, assoc_output, next_decoder_cache, cache_position, output_attentions, use_cache, (hx, cx), ) else: outputs_cross_assoc = cross_assoc_layer( motor_state, position_embeddings, attention_mask=motor_cross_assoc_mask, position_ids=position_ids, kv_states=apply_gradient_scaling( assoc_output, self.config.cross_attention_gradient_scale, self.config.gradient_scaling_enabled, ), output_attentions=output_attentions, past_key_value=next_decoder_cache, cache_position=cache_position, use_cache=use_cache, previous_states=(hx, cx), ) motor_state = motor_state + self.motor_cross_assoc_gate( outputs_cross_assoc[0] ) hx, cx = outputs_cross_assoc[-1 if not use_cache else -2] if output_attentions: all_attentions += (outputs_cross_assoc[1],) if use_cache: next_decoder_cache = outputs_cross_assoc[-1] else: next_decoder_cache = None final_output = self.norm(motor_state) if output_hidden_states: all_hidden_states += (final_output,) if not return_dict: outputs_tuple = (final_output,) if use_cache: outputs_tuple += (next_decoder_cache,) if output_hidden_states: outputs_tuple += (all_hidden_states,) if output_attentions: outputs_tuple += (all_attentions,) return tuple(v for v in outputs_tuple if v is not None) return BaseModelOutputWithPast( last_hidden_state=final_output, past_key_values=next_decoder_cache, hidden_states=all_hidden_states, attentions=all_attentions, ) def get_input_embeddings(self): return self.embed_tokens def set_input_embeddings(self, value): self.embed_tokens = value def _update_causal_mask( self, attention_mask: torch.Tensor, input_tensor: torch.Tensor, cache_position: torch.Tensor, past_key_values: Cache, output_attentions: bool, ): if self.config._attn_implementation == "flash_attention_2": if attention_mask is not None and 0.0 in attention_mask: return attention_mask return None past_seen_tokens = ( past_key_values.get_seq_length() if past_key_values is not None else 0 ) using_static_cache = isinstance(past_key_values, StaticCache) if ( self.config._attn_implementation == "sdpa" and not using_static_cache and not output_attentions ): if AttentionMaskConverter._ignore_causal_mask_sdpa( attention_mask, inputs_embeds=input_tensor, past_key_values_length=past_seen_tokens, is_training=self.training, ): return None dtype, device = input_tensor.dtype, input_tensor.device min_dtype = torch.finfo(dtype).min sequence_length = input_tensor.shape[1] if using_static_cache: target_length = ( getattr( past_key_values, "get_max_length", lambda: past_key_values.get_seq_length(), )() if hasattr(past_key_values, "get_seq_length") else sequence_length + past_seen_tokens ) else: target_length = ( attention_mask.shape[-1] if isinstance(attention_mask, torch.Tensor) else past_seen_tokens + sequence_length + 1 ) causal_mask = _prepare_4d_causal_attention_mask_with_cache_position( attention_mask, sequence_length=sequence_length, target_length=target_length, dtype=dtype, device=device, min_dtype=min_dtype, cache_position=cache_position, batch_size=input_tensor.shape[0], ) if ( self.config._attn_implementation == "sdpa" and attention_mask is not None and attention_mask.device.type == "cuda" and not output_attentions ): causal_mask = AttentionMaskConverter._unmask_unattended( causal_mask, min_dtype ) return causal_mask class NeuroBLASTForCausalLM(NeuroBLASTPreTrainedModel, GenerationMixin): _tied_weights_keys = ["lm_head.weight"] def __init__(self, config: NeuroBLASTConfig): super().__init__(config) self.config = config self.model = NeuroBLASTModel(config) self.vocab_size = config.vocab_size # Ensure vocab_size is accessible self.lm_head = torch.nn.Linear(config.hidden_size, self.vocab_size, bias=False) self.loss_steps = 0 self.post_init() # Initialize weights def get_input_embeddings(self): return self.model.get_input_embeddings() def set_input_embeddings(self, value): self.model.set_input_embeddings(value) def get_output_embeddings(self): return self.lm_head def set_output_embeddings(self, new_embeddings): self.lm_head = new_embeddings def tie_weights(self): if getattr(self.config, "tie_word_embeddings", False): output_embeddings = self.get_output_embeddings() input_embeddings = self.get_input_embeddings() output_embeddings.weight = input_embeddings.weight if getattr(output_embeddings, "bias", None) is not None: output_embeddings.bias.data.zero_() def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, cache_position: Optional[torch.LongTensor] = None, **loss_kwargs, ) -> Union[Tuple, CausalLMOutputWithPast]: return_dict = ( return_dict if return_dict is not None else self.config.use_return_dict ) outputs = self.model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, cache_position=cache_position, ) hidden_states = outputs[0] logits = self.lm_head(hidden_states) logits = logits.float() loss = None if labels is not None: loss = self.loss_function( logits=logits, labels=labels, vocab_size=self.config.vocab_size, **loss_kwargs, ) if not return_dict: output = (logits,) + outputs[1:] return (loss,) + output if loss is not None else output return CausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=outputs.past_key_values, hidden_states=hidden_states, attentions=outputs.attentions, ) def get_input_embeddings(self): return self.model.get_input_embeddings() def set_input_embeddings(self, value): self.model.set_input_embeddings(value) def __str__(self): return f"NeuroBLASTForCausalLM(config={self.config})"