NeuroBLAST-1.9B-Instruct-Early-Preview / modeling_neuroblast.py
mkurman's picture
Upload 11 files
2fe4bd1 verified
raw
history blame
72.7 kB
import torch
import math
from torch import nn
import torch.nn.functional as F
from transformers import PreTrainedModel, GenerationMixin
from transformers.cache_utils import DynamicCache, Cache
from transformers.modeling_attn_mask_utils import AttentionMaskConverter
from transformers.cache_utils import Cache, DynamicCache, StaticCache
from transformers.utils import logging
from transformers.modeling_outputs import (
BaseModelOutputWithPast,
CausalLMOutputWithPast,
)
from transformers.activations import ACT2FN
from typing import Optional, Tuple, Union, List
from .configuration_neuroblast import NeuroBLASTConfig
CLAMP_VALUE = 1e5
logger = logging.get_logger(__name__)
def apply_gradient_scaling(
tensor: torch.Tensor, scale: float, enabled: bool = True
) -> torch.Tensor:
"""
Apply gradient scaling to a tensor.
This scales the gradients during backward pass while keeping forward pass unchanged.
"""
if not enabled or scale == 1.0 or not tensor.requires_grad:
return tensor
# Use a custom autograd function for gradient scaling
class GradientScale(torch.autograd.Function):
@staticmethod
def forward(ctx, input_tensor, scale_factor):
ctx.scale = scale_factor
return input_tensor.clone()
@staticmethod
def backward(ctx, grad_output):
if grad_output is None:
return None, None
return grad_output * ctx.scale, None
return GradientScale.apply(tensor, scale)
def _prepare_4d_causal_attention_mask_with_cache_position(
attention_mask: torch.Tensor,
sequence_length: int,
target_length: int,
dtype: torch.dtype,
device: torch.device,
min_dtype: float,
cache_position: torch.Tensor,
batch_size: int,
):
if attention_mask is not None and attention_mask.dim() == 4:
causal_mask = attention_mask
else:
causal_mask = torch.full(
(sequence_length, target_length),
fill_value=min_dtype,
dtype=dtype,
device=device,
)
if sequence_length != 1:
causal_mask = torch.triu(causal_mask, diagonal=1)
causal_mask *= torch.arange(
target_length, device=device
) > cache_position.reshape(-1, 1)
causal_mask = causal_mask[None, None, :, :].expand(batch_size, 1, -1, -1)
if attention_mask is not None:
causal_mask = (
causal_mask.clone()
)
mask_length = attention_mask.shape[-1]
padding_mask = (
causal_mask[:, :, :, :mask_length] + attention_mask[:, None, None, :]
)
padding_mask = padding_mask == 0
causal_mask[:, :, :, :mask_length] = causal_mask[
:, :, :, :mask_length
].masked_fill(padding_mask, min_dtype)
return causal_mask
# --- RoPE Implementation (using HF LlamaRotaryEmbedding) ---
class LlamaRMSNorm(nn.Module):
def __init__(self, hidden_size, eps=1e-6):
"""
LlamaRMSNorm is equivalent to T5LayerNorm
"""
super().__init__()
self.weight = nn.Parameter(torch.ones(hidden_size))
self.variance_epsilon = eps
def forward(self, hidden_states):
input_dtype = hidden_states.dtype
hidden_states = hidden_states.to(torch.float32)
variance = hidden_states.pow(2).mean(-1, keepdim=True)
hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
return self.weight * hidden_states.to(input_dtype)
def extra_repr(self):
return f"{tuple(self.weight.shape)}, eps={self.variance_epsilon}"
class NeuroBLASTRotaryEmbedding(nn.Module):
"""
Rotary Positional Embedding for NeuroBLAST model.
Source: LlamaRotaryEmbedding
"""
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
super().__init__()
self.dim = dim
self.max_position_embeddings = max_position_embeddings
self.base = base
inv_freq = 1.0 / (
self.base
** (torch.arange(0, self.dim, 2, dtype=torch.float32).to(device) / self.dim)
)
self.register_buffer("inv_freq", inv_freq, persistent=False)
self._set_cos_sin_cache(
seq_len=max_position_embeddings, device="cpu", dtype=torch.float32
)
def _set_cos_sin_cache(self, seq_len, device, dtype):
self.max_seq_len_cached = seq_len
t = torch.arange(
self.max_seq_len_cached, device=device, dtype=self.inv_freq.dtype
)
freqs = torch.einsum("i,j->ij", t, self.inv_freq)
emb = torch.cat((freqs, freqs), dim=-1)
self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
def forward(self, x, seq_len=None):
if seq_len > self.max_seq_len_cached:
self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype)
elif self.cos_cached.device != x.device or self.cos_cached.dtype != x.dtype:
self.cos_cached = self.cos_cached.to(device=x.device, dtype=x.dtype)
self.sin_cached = self.sin_cached.to(device=x.device, dtype=x.dtype)
return (
self.cos_cached[:seq_len],
self.sin_cached[:seq_len],
)
def rotate_half(x):
"""Rotates half the hidden dims of the input."""
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb(q, k, cos, sin, position_ids):
""" Applies rotary positional embeddings to query and key tensors."""
cos = cos[position_ids].unsqueeze(1) # [bs, 1, seq_len, dim]
sin = sin[position_ids].unsqueeze(1) # [bs, 1, seq_len, dim]
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
# Overload for Cross Attention where only query is rotated
def apply_rotary_pos_emb_single(q, cos, sin, position_ids):
""" Applies rotary positional embeddings to query tensor. """
cos = cos[position_ids].unsqueeze(1) # [1, 1, seq_len, dim]
sin = sin[position_ids].unsqueeze(1) # [1, 1, seq_len, dim]
q_embed = (q * cos) + (rotate_half(q) * sin)
return q_embed
class SwiGLUMLP(nn.Module):
"""SwiGLU MLP block"""
def __init__(self, hidden_size, config: NeuroBLASTConfig, dropout):
super().__init__()
intermediate_size = getattr(config, "intermediate_size", int(hidden_size * 2.5))
self.init_std = getattr(config, "initializer_range", 0.02)
self.clamp_value = config.clamp_value
self.gate_proj = nn.Linear(hidden_size, intermediate_size, bias=True)
self.up_proj = nn.Linear(hidden_size, intermediate_size, bias=False)
self.down_proj = nn.Linear(intermediate_size, hidden_size, bias=False)
self.act_fn = nn.SiLU()
self.dropout = nn.Dropout(dropout)
if config.num_experts is not None:
self.experts = NeuroBLASTSparseMoeBlock(config)
# Re-enable scaled initialization
with torch.no_grad():
# Scale down initial weights in the up/gate projections
self.gate_proj.weight.data.normal_(
mean=0.0,
std=self.init_std / math.sqrt(hidden_size), # Scale by input dim
)
if self.gate_proj.bias is not None:
self.gate_proj.bias.data.zero_()
self.up_proj.weight.data.normal_(
mean=0.0, std=self.init_std / math.sqrt(hidden_size)
) # Scale by input dim
# Scale down initial weights in the down projection even further
self.down_proj.weight.data.normal_(
mean=0.0,
std=self.init_std
/ math.sqrt(intermediate_size), # Scale by intermediate dim
)
def forward(self, x):
gated_x = self.gate_proj(x)
activated_x = self.act_fn(gated_x)
up_projected_x = self.up_proj(x)
intermediate_activation = activated_x * up_projected_x
# Clamp the intermediate activation before down_proj
clamp_value = self.clamp_value
intermediate_activation = torch.clamp(
intermediate_activation, min=-clamp_value, max=clamp_value
)
intermediate_activation = torch.nan_to_num(
intermediate_activation
) # Safeguard against NaNs
y = self.down_proj(intermediate_activation)
y = self.dropout(y)
if hasattr(self, "experts"):
z = self.experts(y)
y = y + z
return y
class NeuroBLASTMoeMLP(nn.Module):
""" Source: Qwen3MoeMLP """
def __init__(self, config, intermediate_size=None):
super().__init__()
self.config = config
self.clamp_value = config.clamp_value
self.hidden_size = config.hidden_size
self.intermediate_size = (
intermediate_size
if intermediate_size is not None
else config.intermediate_size
)
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False)
self.act_fn = ACT2FN[config.hidden_act]
def forward(self, x):
down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
down_proj = torch.clamp(down_proj, min=-self.clamp_value, max=self.clamp_value)
down_proj = torch.nan_to_num(down_proj) # Safeguard against NaNs
return down_proj
class NeuroBLASTSparseMoeBlock(nn.Module):
""" Source: Qwen3SparseMoeBlock """
def __init__(self, config):
super().__init__()
self.num_experts = config.num_experts
self.top_k = config.num_experts_per_tok
self.norm_topk_prob = config.norm_topk_prob
# gating
self.gate = nn.Linear(config.hidden_size, config.num_experts, bias=False)
self.experts = nn.ModuleList(
[NeuroBLASTMoeMLP(config) for _ in range(self.num_experts)]
)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
""" """
batch_size, sequence_length, hidden_dim = hidden_states.shape
hidden_states = hidden_states.view(-1, hidden_dim)
# router_logits: (batch * sequence_length, n_experts)
router_logits = self.gate(hidden_states)
routing_weights = F.softmax(router_logits, dim=1, dtype=torch.float)
routing_weights, selected_experts = torch.topk(
routing_weights, self.top_k, dim=-1
)
if self.norm_topk_prob: # only diff with mixtral sparse moe block!
routing_weights /= routing_weights.sum(dim=-1, keepdim=True)
# we cast back to the input dtype
routing_weights = routing_weights.to(hidden_states.dtype)
final_hidden_states = torch.zeros(
(batch_size * sequence_length, hidden_dim),
dtype=hidden_states.dtype,
device=hidden_states.device,
)
# One hot encode the selected experts to create an expert mask
# this will be used to easily index which expert is going to be sollicitated
expert_mask = torch.nn.functional.one_hot(
selected_experts, num_classes=self.num_experts
).permute(2, 1, 0)
# Loop over all available experts in the model and perform the computation on each expert
for expert_idx in range(self.num_experts):
expert_layer = self.experts[expert_idx]
idx, top_x = torch.where(expert_mask[expert_idx])
# Index the correct hidden states and compute the expert hidden state for
# the current expert. We need to make sure to multiply the output hidden
# states by `routing_weights` on the corresponding tokens (top-1 and top-2)
current_state = hidden_states[None, top_x].reshape(-1, hidden_dim)
current_hidden_states = (
expert_layer(current_state) * routing_weights[top_x, idx, None]
)
# However `index_add_` only support torch tensors for indexing so we'll use
# the `top_x` tensor here.
final_hidden_states.index_add_(
0, top_x, current_hidden_states.to(hidden_states.dtype)
)
final_hidden_states = final_hidden_states.reshape(
batch_size, sequence_length, hidden_dim
)
return final_hidden_states
class NeuroBLASTRouterBlock(nn.Module):
""" Memory router; overcomplicated due to backward compatibility """
def __init__(
self,
config,
hidden_size,
):
super().__init__()
self.num_experts = 2
self.top_k = 1
self.norm_topk_prob = config.norm_topk_prob
# gating
self.gate = nn.Linear(hidden_size, self.num_experts, bias=False)
def forward(self, hidden_states: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
""" """
batch_size, sequence_length, hidden_dim = hidden_states.shape
hidden_states = hidden_states.view(-1, hidden_dim)
# router_logits: (batch * sequence_length, n_experts)
router_logits = self.gate(hidden_states)
routing_weights = F.softmax(router_logits, dim=1, dtype=torch.float)
routing_weights, selected_experts = torch.topk(
routing_weights, self.top_k, dim=-1
)
if self.norm_topk_prob: # only diff with mixtral sparse moe block!
routing_weights /= routing_weights.sum(dim=-1, keepdim=True)
# we cast back to the input dtype
routing_weights = routing_weights.to(hidden_states.dtype)
return routing_weights, selected_experts
class SelfAttention(torch.nn.Module):
def __init__(
self,
config: NeuroBLASTConfig,
hidden_size: int,
is_causal: bool = False,
layer_idx: Optional[int] = None,
):
super().__init__()
self.is_causal = is_causal
self.dropout_p = config.dropout # Will apply based on self.training
self.layer_idx = layer_idx
self.hidden_size = hidden_size
self.intermediate_size = config.kv_dim
self.use_flash_attn = config.use_flash_attn
# Allow overriding num_heads, default to config.num_attention_heads
self.num_heads = getattr(
config, f"num_heads_{layer_idx}", config.num_attention_heads
)
self.head_dim = self.intermediate_size // self.num_heads
if (self.head_dim * self.num_heads) != self.intermediate_size:
raise ValueError(
f"Layer {self.layer_idx}: hidden_size ({self.intermediate_size}) must be divisible by num_heads ({self.num_heads})"
)
self.qkv_proj = nn.Linear(
self.hidden_size, self.intermediate_size * 3, bias=True
)
self.o_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False)
self.dropout = nn.Dropout(config.dropout)
def forward(
self,
hidden_states: torch.Tensor,
position_embeddings: Tuple[torch.Tensor, torch.Tensor], # (cos, sin)
attention_mask: Optional[
torch.Tensor
], # Not directly used by flash_attn causal
past_key_value: Optional[Cache] = None,
cache_position: Optional[torch.LongTensor] = None,
output_attentions: Optional[bool] = False,
use_cache: Optional[bool] = False,
position_ids: Optional[torch.LongTensor] = None,
):
batch_size, seq_len, _ = hidden_states.shape
dropout_p = self.dropout_p if self.training else 0.0
qkv = self.qkv_proj(hidden_states)
query_states, key_states, value_states = qkv.chunk(3, dim=-1)
query_states = query_states.view(
batch_size, seq_len, self.num_heads, self.head_dim
).transpose(1, 2)
key_states = key_states.view(
batch_size, seq_len, self.num_heads, self.head_dim
).transpose(1, 2)
value_states = value_states.view(
batch_size, seq_len, self.num_heads, self.head_dim
).transpose(1, 2)
cos, sin = position_embeddings
query_states, key_states = apply_rotary_pos_emb(
query_states, key_states, cos, sin, position_ids
)
if past_key_value is not None:
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}
# When using Cache object, updating happens in-place.
key_states, value_states = past_key_value.update(
key_states, value_states, self.layer_idx, cache_kwargs
)
if self.use_flash_attn:
causal_mask = attention_mask
if attention_mask is not None:
if attention_mask.dim() == 4:
causal_mask = attention_mask[:, :, :, : key_states.shape[-2]]
elif attention_mask.dim() == 2:
causal_mask = attention_mask
if causal_mask.dtype not in [
torch.bool,
torch.float16,
torch.float32,
torch.bfloat16,
]:
causal_mask = causal_mask.to(query_states.dtype)
is_causal = (
True if causal_mask is None and query_states.shape[-2] > 1 else False
)
attn_output = F.scaled_dot_product_attention(
query_states,
key_states,
value_states,
attn_mask=causal_mask,
dropout_p=dropout_p,
enable_gqa=False,
scale=self.head_dim**-0.5,
is_causal=is_causal,
)
attn_weights = None
else:
attn_weights = torch.matmul(query_states, key_states.transpose(-1, -2)) / (
self.head_dim**0.5
)
if attention_mask is not None:
attn_weights = attn_weights + attention_mask
elif self.is_causal and seq_len > 1:
causal_mask = torch.triu(
torch.ones(
(seq_len, key_states.shape[2]),
dtype=torch.bool,
device=query_states.device,
),
diagonal=1,
)
attn_weights = attn_weights.masked_fill(
causal_mask.unsqueeze(0).unsqueeze(0), float("-inf")
)
attn_weights = F.softmax(attn_weights, dim=-1)
attn_weights_dropped = F.dropout(
attn_weights, p=dropout_p, training=self.training
)
attn_output = torch.matmul(attn_weights_dropped, value_states)
attn_output = attn_output.transpose(1, 2).contiguous()
attn_output = attn_output.view(batch_size, seq_len, -1)
attn_output = self.o_proj(attn_output)
attn_output = self.dropout(attn_output)
if output_attentions:
logger.warning(
f"Layer {self.layer_idx}: Flash Attention does not return attention weights."
)
outputs = (attn_output,)
if output_attentions:
outputs += (attn_weights,)
if use_cache:
outputs += (past_key_value,) # Return the cache object
return outputs
class CrossAttention(torch.nn.Module):
def __init__(
self,
config: NeuroBLASTConfig,
query_dim: int,
kv_dim: int,
layer_idx: int,
is_causal: bool = True,
):
super().__init__()
self.dropout_p = config.dropout
self.layer_idx = layer_idx
self.query_dim = query_dim
self.kv_dim = kv_dim
self.is_causal = is_causal
self.num_heads = config.num_attention_heads
self.head_dim = self.kv_dim // self.num_heads
self.kv_head_dim = (
self.kv_dim // self.num_heads
)
if (self.head_dim * self.num_heads) != self.kv_dim:
raise ValueError(
f"CrossAttn {layer_idx}: query_dim ({self.kv_dim}) must be divisible by num_heads ({self.num_heads})"
)
if (self.kv_head_dim * self.num_heads) != self.kv_dim:
raise ValueError(
f"CrossAttn {layer_idx}: kv_dim ({kv_dim}) must be divisible by num_heads ({self.num_heads})"
)
self.q_proj = nn.Linear(self.query_dim, self.kv_dim, bias=True)
self.k_proj = nn.Linear(self.query_dim, self.kv_dim, bias=True)
self.v_proj = nn.Linear(self.query_dim, self.kv_dim, bias=True)
self.o_proj = nn.Linear(self.kv_dim, self.query_dim, bias=False)
self.dropout = nn.Dropout(config.dropout)
self.use_flash_attn = hasattr(
F, "scaled_dot_product_attention"
)
def forward(
self,
query_states: torch.Tensor,
kv_states: torch.Tensor,
position_embeddings: Tuple[
torch.Tensor, torch.Tensor
],
past_key_value: Optional[Cache] = None,
cache_position: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = False,
position_ids: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = False,
):
batch_size, q_seq_len, _ = query_states.shape
kv_seq_len = kv_states.shape[1]
dropout_p = self.dropout_p if self.training else 0.0
query = self.q_proj(query_states)
key = self.k_proj(kv_states)
value = self.v_proj(kv_states)
query = query.view(
batch_size, q_seq_len, self.num_heads, self.head_dim
).transpose(1, 2)
cos, sin = position_embeddings
query = apply_rotary_pos_emb_single(query, cos, sin, position_ids)
if past_key_value is not None:
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}
key, value = past_key_value.update(
key, value, self.layer_idx or 0, cache_kwargs
)
kv_seq_len = key.shape[1]
key = key.view(
batch_size, kv_seq_len, self.num_heads, self.kv_head_dim
).transpose(1, 2)
value = value.view(
batch_size, kv_seq_len, self.num_heads, self.kv_head_dim
).transpose(1, 2)
sdpa_attn_mask = attention_mask
if self.use_flash_attn:
is_causal = True if sdpa_attn_mask is None and q_seq_len > 1 else False
attn_output = F.scaled_dot_product_attention(
query,
key,
value,
attn_mask=sdpa_attn_mask if not is_causal else None,
dropout_p=dropout_p,
is_causal=is_causal,
enable_gqa=False,
scale=self.head_dim**-0.5,
)
attn_weights = None
else:
attn_weights = torch.matmul(query, key.transpose(-1, -2)) / (
self.head_dim**0.5
)
if sdpa_attn_mask is not None:
attn_weights = attn_weights + sdpa_attn_mask
elif self.is_causal and q_seq_len > 1:
causal_mask = torch.triu(
torch.ones(
(q_seq_len, kv_seq_len), dtype=torch.bool, device=query.device
),
diagonal=1,
)
attn_weights = attn_weights.masked_fill(
causal_mask.unsqueeze(0).unsqueeze(0), float("-inf")
)
attn_weights = F.softmax(attn_weights, dim=-1)
attn_weights_dropped = F.dropout(
attn_weights, p=dropout_p, training=self.training
)
attn_output = torch.matmul(attn_weights_dropped, value)
attn_output = attn_output.transpose(
1, 2
).contiguous()
attn_output = attn_output.reshape(batch_size, q_seq_len, self.kv_dim)
attn_output = self.o_proj(attn_output)
attn_output = self.dropout(attn_output)
outputs = (attn_output,)
if output_attentions:
outputs += (attn_weights,)
if use_cache:
outputs += (past_key_value,)
return outputs
class AttentionBlock(torch.nn.Module):
"""Modified Attention Block with Pre-Norm and choice of Self/Cross Attention & MLP"""
def __init__(
self,
config: NeuroBLASTConfig,
hidden_size: int,
attention_module: nn.Module,
mlp_module: nn.Module,
is_cross_attention: bool = False,
layer_idx: int = 0,
precomputed_total_layers: Optional[int] = None,
):
super().__init__()
self.hidden_size = hidden_size
self.config = config
self.layer_idx = layer_idx
self.is_cross_attention = is_cross_attention
self.input_layernorm = nn.LayerNorm(
hidden_size, eps=getattr(config, "rms_norm_eps", 1e-5)
)
self.attention = attention_module
self.post_attention_layernorm = nn.LayerNorm(
hidden_size, eps=getattr(config, "rms_norm_eps", 1e-5)
)
self.mlp = mlp_module
if self.config.use_zero_memory and (
self.config.zero_memory_layers is None
or self.layer_idx in self.config.zero_memory_layers
):
self.router = NeuroBLASTRouterBlock(config, hidden_size)
self.memory = NeuroBLASTMemory(
config,
hidden_size=hidden_size,
layer_idx=layer_idx,
precomputed_total_layers=precomputed_total_layers,
)
def forward(
self,
hidden_states: torch.Tensor,
position_embeddings: Tuple[torch.Tensor, torch.Tensor],
attention_mask: Optional[torch.Tensor],
position_ids: Optional[torch.LongTensor],
kv_states: Optional[torch.Tensor] = None,
past_key_value: Optional[Cache] = None,
cache_position: Optional[torch.LongTensor] = None,
output_attentions: Optional[bool] = False,
use_cache: Optional[bool] = False,
previous_states: Optional[Tuple[torch.Tensor, torch.Tensor]] = None,
):
residual = hidden_states
hidden_states = torch.nan_to_num(hidden_states)
normed_hidden_states = self.input_layernorm(hidden_states)
if self.is_cross_attention:
if kv_states is None:
raise ValueError("kv_states must be provided for CrossAttention")
attn_outputs = self.attention(
query_states=normed_hidden_states,
kv_states=kv_states,
past_key_value=past_key_value,
cache_position=cache_position,
position_embeddings=position_embeddings,
attention_mask=attention_mask,
output_attentions=output_attentions,
position_ids=position_ids,
use_cache=use_cache,
)
else:
attn_outputs = self.attention(
normed_hidden_states,
position_embeddings=position_embeddings,
attention_mask=attention_mask,
past_key_value=past_key_value,
cache_position=cache_position,
output_attentions=output_attentions,
use_cache=use_cache,
position_ids=position_ids,
)
attn_output = attn_outputs[0]
past_key_value = attn_outputs[-1] if use_cache else None
hidden_states = residual + attn_output
hidden_states = torch.nan_to_num(hidden_states)
residual = hidden_states
normed_hidden_states = self.post_attention_layernorm(hidden_states)
mlp_output = self.mlp(normed_hidden_states)
hidden_states = residual + mlp_output
hidden_states = torch.nan_to_num(hidden_states)
if self.config.use_zero_memory and (
self.config.zero_memory_layers is None
or self.layer_idx in self.config.zero_memory_layers
):
routing_weights, selected_experts = self.router(hidden_states)
residual = hidden_states
hidden_states, (hx, cx), past_key_value = self.memory(
hidden_states,
previous_states,
past_key_value=past_key_value,
cache_position=cache_position,
position_embeddings=position_embeddings,
attention_mask=attention_mask,
output_attentions=output_attentions,
position_ids=position_ids,
use_cache=use_cache,
)
hidden_states = torch.nan_to_num(hidden_states)
hx = torch.nan_to_num(hx)
cx = torch.nan_to_num(cx)
hidden_states = hidden_states * routing_weights.reshape(
hidden_states.shape[:-1]
).unsqueeze(-1)
hidden_states = residual + self.config.zero_memory_alpha * hidden_states
hidden_states = torch.nan_to_num(hidden_states)
outputs = (hidden_states,) + attn_outputs[1:]
if self.config.use_zero_memory and (
self.config.zero_memory_layers is None
or self.layer_idx in self.config.zero_memory_layers
):
outputs += ((hx, cx),)
else:
outputs += (previous_states,)
if use_cache:
outputs += (past_key_value,)
return outputs
class NeuroBLASTMemory(nn.Module):
def __init__(
self,
config: NeuroBLASTConfig,
hidden_size: int = 256,
num_heads: int = 4,
scale_factor: int = 4,
layer_idx: int = 0,
with_hx: bool = True,
precomputed_total_layers: Optional[int] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.hidden_size = hidden_size
self.num_heads = num_heads
self.scale_factor = scale_factor
self.clamp_value = config.clamp_value
# Use precomputed_total_layers instead of hardcoded 100 for layer index shift
layer_shift = (
precomputed_total_layers if precomputed_total_layers is not None else 100
)
self.layer_idx = layer_idx + layer_shift
self.with_hx = with_hx
self.kv_dim = (
config.kv_dim
)
self.scaled_dim = hidden_size * scale_factor
self.head_dim = self.kv_dim // config.num_attention_heads
self.num_heads = self.hidden_size // self.head_dim
self.norm1 = nn.LayerNorm(hidden_size)
if self.with_hx:
self.lin1 = nn.Linear(self.hidden_size, self.scaled_dim)
self.lin2 = nn.Linear(self.hidden_size, self.scaled_dim)
self.lin3 = nn.Linear(self.scaled_dim, self.hidden_size)
self.lin4 = nn.Linear(self.hidden_size, self.scaled_dim)
self.lin5 = nn.Linear(self.scaled_dim, self.hidden_size)
if self.with_hx:
self.lin6 = nn.Linear(self.scaled_dim, self.hidden_size)
self.gate1 = nn.Linear(self.scaled_dim, self.scaled_dim)
self.act1 = nn.SiLU()
self.gate2 = nn.Linear(self.scaled_dim, self.scaled_dim)
self.act2 = nn.SiLU()
self.last_token_reg = nn.Linear(self.hidden_size, self.hidden_size, bias=True)
self.prev_tokens_reg = nn.Linear(self.hidden_size, self.hidden_size, bias=True)
self.norm2 = nn.LayerNorm(hidden_size)
self.dropout = nn.Dropout(config.dropout)
def forward(
self,
x: torch.Tensor,
previous_state: tuple[torch.Tensor, torch.Tensor],
position_embeddings: Tuple[torch.Tensor, torch.Tensor],
attention_mask: Optional[torch.Tensor],
position_ids: Optional[torch.LongTensor],
past_key_value: Optional[Cache] = None,
cache_position: Optional[torch.LongTensor] = None,
output_attentions: Optional[bool] = False,
use_cache: Optional[bool] = False,
):
hx, cx = previous_state
b, s, d = x.size()
x = torch.nan_to_num(x)
norm_x = self.norm1(x)
norm_x = norm_x.view(b, s, self.num_heads, self.head_dim).transpose(1, 2)
cos, sin = position_embeddings
norm_x = apply_rotary_pos_emb_single(norm_x, cos, sin, position_ids)
kv_seq_len = s
if past_key_value is not None:
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}
norm_x, _ = past_key_value.update(
norm_x,
torch.zeros((b, 1, kv_seq_len, d)),
self.layer_idx,
cache_kwargs,
)
kv_seq_len = norm_x.shape[2]
norm_x = norm_x.transpose(1, 2).contiguous()
norm_x = norm_x.view(b, kv_seq_len, d)
norm_x = torch.nan_to_num(norm_x)
expanded_x = None
shifted_x = torch.cat(
[
torch.zeros((b, 1, d), device=x.device, dtype=x.dtype),
(norm_x[:, :-1].contiguous()),
],
dim=1,
).contiguous()
shifted_x = torch.nan_to_num(shifted_x) # Replace NaNs with zeros
prev_tokens_x = norm_x.cumsum(dim=1)
prev_tokens_x = prev_tokens_x - shifted_x
prev_tokens_x = torch.nan_to_num(prev_tokens_x)[:, -s:].contiguous()
if self.with_hx:
expanded_x = self.lin1(norm_x[:, -s:].contiguous())
expanded_x = torch.nan_to_num(expanded_x)
expanded_shifted_x = self.lin2(shifted_x[:, -s:].contiguous())
expanded_shifted_x = torch.nan_to_num(expanded_shifted_x)
gated_shifted_x = self.gate1(expanded_shifted_x)
gated_shifted_x = self.act1(gated_shifted_x)
gated_shifted_x = torch.clamp(
gated_shifted_x, min=-self.clamp_value, max=self.clamp_value
)
gated_shifted_x = torch.nan_to_num(gated_shifted_x)
collapsed_shifted_x = self.lin3(gated_shifted_x)
collapsed_shifted_x = torch.nan_to_num(collapsed_shifted_x)
prev_tokens_x = torch.nan_to_num(prev_tokens_x)
expanded_prev_tokens_x = self.lin4(prev_tokens_x)
expanded_prev_tokens_x = torch.nan_to_num(expanded_prev_tokens_x)
gated_prev_tokens_x = self.gate2(expanded_prev_tokens_x)
gated_prev_tokens_x = self.act2(gated_prev_tokens_x)
gated_prev_tokens_x = torch.clamp(
gated_prev_tokens_x, min=-self.clamp_value, max=self.clamp_value
)
gated_prev_tokens_x = torch.nan_to_num(gated_prev_tokens_x)
collapsed_prev_tokens_x = self.lin5(gated_prev_tokens_x)
collapsed_prev_tokens_x = torch.nan_to_num(collapsed_prev_tokens_x)
if self.with_hx:
weights = torch.softmax(expanded_x * expanded_shifted_x, dim=-1)
expanded_x_attn = weights * expanded_x
expanded_x_attn = torch.nan_to_num(expanded_x_attn)
hx = hx + self.lin6(expanded_x_attn)
hx = torch.nan_to_num(hx)
if self.with_hx:
x = torch.nan_to_num(x)
hx = torch.nan_to_num(hx)
collapsed_shifted_x = torch.nan_to_num(collapsed_shifted_x)
collapsed_prev_tokens_x = torch.nan_to_num(collapsed_prev_tokens_x)
output = x + (
hx
* (
self.last_token_reg(collapsed_shifted_x)
+ self.prev_tokens_reg(collapsed_prev_tokens_x)
)
)
output = torch.nan_to_num(output)
else:
output = (
x
+ (
self.last_token_reg(collapsed_shifted_x)
+ self.prev_tokens_reg(collapsed_prev_tokens_x)
)[:, -s:].contiguous()
)
output = self.norm2(output)
output = self.dropout(output)
output = torch.nan_to_num(output)
return (
output,
(hx, cx),
past_key_value,
)
class NeuroBLASTPreTrainedModel(PreTrainedModel):
config_class = NeuroBLASTConfig
base_model_prefix = "brain"
supports_gradient_checkpointing = True
_no_split_modules = []
_keys_to_ignore_on_load_unexpected = [r"decoder\.version"]
_supports_flash_attn_2 = False
_supports_sdpa = True
def _init_weights(self, module):
std = getattr(self.config, "initializer_range", 0.02)
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=std)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
elif isinstance(module, nn.LayerNorm):
if module.bias is not None:
module.bias.data.zero_()
module.weight.data.fill_(1.0)
class NeuroBLASTModel(NeuroBLASTPreTrainedModel):
def __init__(self, config: NeuroBLASTConfig):
super(NeuroBLASTModel, self).__init__(config)
self.config = config
self.padding_idx = config.pad_token_id
self.vocab_size = config.vocab_size
self.embed_tokens = nn.Embedding(
config.vocab_size,
config.hidden_size, # Using main hidden_size for embeddings now
padding_idx=self.padding_idx,
)
self.rotary_emb = NeuroBLASTRotaryEmbedding(
config.kv_dim // config.num_attention_heads,
max_position_embeddings=config.max_position_embeddings,
base=config.rope_theta,
)
self.dropout = nn.Dropout(config.dropout)
self.assoc_to_sensory_pooler = nn.Sequential(
nn.Linear(config.hidden_size, config.hidden_size),
nn.Identity(), # Backward compatibility - previously LayerNorm, but we found that removing it improve generalization
nn.GELU(),
nn.LayerNorm(config.hidden_size),
)
self.assoc_to_motor_pooler = nn.Sequential(
nn.Linear(config.hidden_size, config.hidden_size),
nn.Identity(), # Backward compatibility
nn.GELU(),
nn.LayerNorm(config.hidden_size),
)
self.sensory_to_motor_pooler = nn.Sequential(
nn.Linear(config.hidden_size, config.hidden_size),
nn.Identity(), # Backward compatibility
nn.GELU(),
nn.LayerNorm(config.hidden_size),
)
# --- Cortex Layers ---
# Using generic AttentionBlock with specific Self/Cross Attention modules passed in
total_layers = 0
# Precompute total layers for Memory layer indexing before creating any layers
precomputed_total_layers = (
config.num_association_cortex_layers
+ config.num_sensory_cortex_layers * 2
+ config.num_motor_cortex_layers
* 3
)
self.precomputed_total_layers = precomputed_total_layers
config.precomputed_total_layers = precomputed_total_layers
# 1. Association Cortex (Self-Attention)
self.association_cortex = nn.ModuleList()
for i in range(config.num_association_cortex_layers):
layer_idx = total_layers + i
print(f"Adding layer {layer_idx} to association cortex")
self.association_cortex.append(
AttentionBlock(
config,
config.hidden_size, # Use main hidden_size
attention_module=SelfAttention(
config, config.hidden_size, is_causal=True, layer_idx=layer_idx
),
mlp_module=(
NeuroBLASTSparseMoeBlock(
config,
)
if config.num_experts
else NeuroBLASTMoeMLP(config)
),
is_cross_attention=False,
layer_idx=layer_idx,
precomputed_total_layers=precomputed_total_layers,
)
)
total_layers += config.num_association_cortex_layers
# 2. Sensory Cortex (Self-Attention + Cross-Attention to Association)
self.sensory_self_attn_layers = nn.ModuleList()
self.sensory_cross_attn_layers = (
nn.ModuleList()
) # One cross-attn per self-attn layer
for i in range(config.num_sensory_cortex_layers):
layer_idx = total_layers + i
print(f"Adding layer {layer_idx} to sensory cortex")
self.sensory_self_attn_layers.append(
AttentionBlock(
config,
config.hidden_size,
attention_module=SelfAttention(
config,
config.hidden_size,
is_causal=True,
layer_idx=layer_idx,
),
mlp_module=(
NeuroBLASTSparseMoeBlock(
config,
)
if config.num_experts
else NeuroBLASTMoeMLP(config)
),
is_cross_attention=False,
layer_idx=layer_idx,
precomputed_total_layers=precomputed_total_layers,
)
)
total_layers += config.num_sensory_cortex_layers
for i in range(config.num_sensory_cortex_layers):
layer_idx = total_layers + i
print(f"Adding layer {layer_idx} to sensory cross-attention")
# Add Cross-Attention layer: Sensory queries, Association is K/V source
self.sensory_cross_attn_layers.append(
AttentionBlock(
config,
config.hidden_size, # Query Dim
attention_module=CrossAttention(
config,
query_dim=config.hidden_size,
kv_dim=config.kv_dim, # Assoc output dim
layer_idx=layer_idx,
),
mlp_module=(
NeuroBLASTSparseMoeBlock(
config,
)
if config.num_experts
else NeuroBLASTMoeMLP(config)
),
is_cross_attention=True,
layer_idx=layer_idx,
precomputed_total_layers=precomputed_total_layers,
)
)
total_layers += config.num_sensory_cortex_layers
# 3. Motor Cortex (Self-Attention + Cross-Attention to Sensory + Cross-Attention to Association)
self.motor_self_attn_layers = nn.ModuleList()
self.motor_cross_sensory_layers = nn.ModuleList()
self.motor_cross_assoc_layers = nn.ModuleList()
for i in range(config.num_motor_cortex_layers):
layer_idx = total_layers + i
print(f"Adding layer {layer_idx} to motor cortex")
self.motor_self_attn_layers.append(
AttentionBlock(
config,
config.hidden_size,
attention_module=SelfAttention(
config,
config.hidden_size,
is_causal=True,
layer_idx=layer_idx,
),
mlp_module=(
NeuroBLASTSparseMoeBlock(
config,
)
if config.num_experts
else NeuroBLASTMoeMLP(config)
),
is_cross_attention=False,
layer_idx=layer_idx,
precomputed_total_layers=precomputed_total_layers,
)
)
total_layers += config.num_motor_cortex_layers
for i in range(config.num_motor_cortex_layers):
layer_idx = total_layers + i
print(f"Adding layer {layer_idx} to motor cross-sensory")
# Cross-Attend to Sensory Output
self.motor_cross_sensory_layers.append(
AttentionBlock(
config,
config.hidden_size, # Query Dim
attention_module=CrossAttention(
config,
query_dim=config.hidden_size,
kv_dim=config.kv_dim, # Sensory output dim
layer_idx=layer_idx,
),
mlp_module=(
NeuroBLASTSparseMoeBlock(
config,
)
if config.num_experts
else NeuroBLASTMoeMLP(config)
),
is_cross_attention=True,
layer_idx=layer_idx,
precomputed_total_layers=precomputed_total_layers,
)
)
total_layers += config.num_motor_cortex_layers
for i in range(config.num_motor_cortex_layers):
layer_idx = total_layers + i
print(f"Adding layer {layer_idx} to motor cross-association")
# Cross-Attend to Association Output
self.motor_cross_assoc_layers.append(
AttentionBlock(
config,
config.hidden_size, # Query Dim
attention_module=CrossAttention(
config,
query_dim=config.hidden_size,
kv_dim=config.kv_dim, # Assoc output dim
layer_idx=layer_idx,
),
mlp_module=(
NeuroBLASTSparseMoeBlock(
config,
)
if config.num_experts
else NeuroBLASTMoeMLP(config)
),
is_cross_attention=True,
layer_idx=layer_idx,
precomputed_total_layers=precomputed_total_layers,
)
)
total_layers += config.num_motor_cortex_layers
# Initialize more conservatively to prevent strong gradient flow initially
self.sensory_cross_assoc_gate = NeuroBLASTMoeMLP(
config,
)
self.motor_cross_sensory_gate = NeuroBLASTMoeMLP(
config,
)
self.motor_cross_assoc_gate = NeuroBLASTMoeMLP(
config,
)
# Final normalization before output head
self.norm = nn.LayerNorm(
config.hidden_size, eps=getattr(config, "rms_norm_eps", 1e-5)
)
self.gradient_checkpointing = False
self.post_init()
def forward(
self,
input_ids: torch.LongTensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[
Cache
] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
):
output_attentions = (
output_attentions
if output_attentions is not None
else self.config.output_attentions
)
output_hidden_states = (
output_hidden_states
if output_hidden_states is not None
else self.config.output_hidden_states
)
use_cache = use_cache if use_cache is not None else self.config.use_cache
# use_cache = False
return_dict = (
return_dict if return_dict is not None else self.config.use_return_dict
)
if input_ids is not None and inputs_embeds is not None:
raise ValueError("Specify either input_ids or inputs_embeds")
batch_size, seq_length = (
input_ids.shape if input_ids is not None else inputs_embeds.shape[:2]
)
if self.gradient_checkpointing and self.training:
if use_cache:
logger.warning_once(
"`use_cache=True` incompatible with gradient checkpointing. Setting `use_cache=False`"
)
use_cache = False
if not any(param.requires_grad for param in self.parameters()):
logger.warning_once(
"No parameters require gradients. Disabling gradient checkpointing to avoid warnings."
)
self.gradient_checkpointing = False
if inputs_embeds is None:
inputs_embeds = self.embed_tokens(input_ids)
past_key_values_length = 0
if use_cache:
if not isinstance(past_key_values, Cache):
past_key_values = DynamicCache.from_legacy_cache(past_key_values)
past_key_values_length = past_key_values.get_seq_length()
if cache_position is None:
past_seen_tokens = (
past_key_values.get_seq_length() if past_key_values is not None else 0
)
cache_position = torch.arange(
past_seen_tokens,
past_seen_tokens + inputs_embeds.shape[1],
device=inputs_embeds.device,
)
if position_ids is None:
position_ids = cache_position.unsqueeze(0)
causal_mask = self._update_causal_mask(
attention_mask,
inputs_embeds,
cache_position,
past_key_values,
output_attentions,
)
hidden_states = inputs_embeds
cos, sin = self.rotary_emb(
hidden_states, seq_len=seq_length + past_key_values_length
)
position_embeddings = (cos, sin)
all_hidden_states = () if output_hidden_states else None
all_attentions = (
() if output_attentions else None
)
next_decoder_cache = (
past_key_values if use_cache else None
)
if self.config.use_zero_memory:
hx = torch.ones(
(batch_size, seq_length, hidden_states.size(-1)),
device=hidden_states.device,
dtype=hidden_states.dtype,
)
cx = torch.ones(
(batch_size, seq_length, hidden_states.size(-1)),
device=hidden_states.device,
dtype=hidden_states.dtype,
)
if self.training:
hx.requires_grad_()
cx.requires_grad_()
else:
hx = None
cx = None
# 1. Association Cortex (Self-Attention)
assoc_output = hidden_states
for i, layer in enumerate(self.association_cortex):
if output_hidden_states:
all_hidden_states += (assoc_output,)
if self.gradient_checkpointing and self.training:
outputs = torch.utils.checkpoint.checkpoint(
layer,
assoc_output,
position_embeddings,
causal_mask,
position_ids,
None,
next_decoder_cache,
cache_position,
output_attentions,
use_cache,
(hx, cx),
)
else:
outputs = layer(
assoc_output,
position_embeddings,
causal_mask,
position_ids,
kv_states=None,
past_key_value=next_decoder_cache,
cache_position=cache_position,
output_attentions=output_attentions,
use_cache=use_cache,
previous_states=(hx, cx),
)
assoc_output = outputs[0]
hx, cx = outputs[-1 if not use_cache else -2]
if output_attentions:
all_attentions += (outputs[1],)
if use_cache:
next_decoder_cache = outputs[-1]
else:
next_decoder_cache = None
sensory_state = self.assoc_to_sensory_pooler(assoc_output)
sensory_state = apply_gradient_scaling(
sensory_state,
self.config.association_gradient_scale,
self.config.gradient_scaling_enabled,
)
# 2. Sensory Cortex (Self-Attention + Cross-Attention to Association)
for i in range(self.config.num_sensory_cortex_layers):
if output_hidden_states:
all_hidden_states += (sensory_state,)
self_attn_layer = self.sensory_self_attn_layers[i]
if self.gradient_checkpointing and self.training:
outputs_self = torch.utils.checkpoint.checkpoint(
self_attn_layer,
sensory_state,
position_embeddings,
causal_mask,
position_ids,
None,
next_decoder_cache,
cache_position,
output_attentions,
use_cache,
(hx, cx),
)
else:
outputs_self = self_attn_layer(
sensory_state,
position_embeddings,
causal_mask,
position_ids,
kv_states=None,
past_key_value=next_decoder_cache,
cache_position=cache_position,
output_attentions=output_attentions,
use_cache=use_cache,
previous_states=(hx, cx),
)
sensory_state = outputs_self[0]
hx, cx = outputs_self[-1 if not use_cache else -2]
if output_attentions:
all_attentions += (outputs_self[1],)
if use_cache:
next_decoder_cache = outputs_self[-1]
else:
next_decoder_cache = None
cross_attn_layer = self.sensory_cross_attn_layers[i]
cross_attn_causal_mask = None
if causal_mask is not None:
q_seq_len = sensory_state.size(1)
kv_seq_len = assoc_output.size(1)
cross_attn_causal_mask = torch.ones(
(batch_size, 1, q_seq_len, kv_seq_len),
device=hidden_states.device,
dtype=hidden_states.dtype,
)
causal_mask_upper = torch.triu(
torch.ones((q_seq_len, kv_seq_len), device=hidden_states.device),
diagonal=1,
)
cross_attn_causal_mask = cross_attn_causal_mask.masked_fill(
causal_mask_upper.unsqueeze(0).unsqueeze(0).bool(),
torch.finfo(hidden_states.dtype).min,
)
if self.gradient_checkpointing and self.training:
outputs_cross = torch.utils.checkpoint.checkpoint(
cross_attn_layer,
sensory_state,
position_embeddings,
cross_attn_causal_mask,
position_ids,
assoc_output,
next_decoder_cache,
cache_position,
output_attentions,
use_cache,
(hx, cx),
)
else:
outputs_cross = cross_attn_layer(
sensory_state,
position_embeddings,
past_key_value=next_decoder_cache,
cache_position=cache_position,
attention_mask=cross_attn_causal_mask,
position_ids=position_ids,
kv_states=apply_gradient_scaling(
assoc_output,
self.config.cross_attention_gradient_scale,
self.config.gradient_scaling_enabled,
),
output_attentions=output_attentions,
use_cache=use_cache,
previous_states=(hx, cx),
)
cross_contribution = nn.functional.layer_norm(
outputs_cross[0],
normalized_shape=(self.config.hidden_size,),
eps=getattr(self.config, "rms_norm_eps", 1e-5),
)
sensory_state = sensory_state + self.sensory_cross_assoc_gate(
cross_contribution
)
hx, cx = outputs_cross[-1 if not use_cache else -2]
if output_attentions:
all_attentions += (outputs_cross[1],)
if use_cache:
next_decoder_cache = outputs_cross[-1]
else:
next_decoder_cache = None
motor_state = self.sensory_to_motor_pooler(sensory_state)
motor_state = apply_gradient_scaling(
motor_state,
self.config.sensory_gradient_scale,
self.config.gradient_scaling_enabled,
)
motor_state_from_assoc = self.assoc_to_motor_pooler(assoc_output)
motor_state_from_assoc = apply_gradient_scaling(
motor_state_from_assoc,
self.config.association_gradient_scale,
self.config.gradient_scaling_enabled,
)
motor_state = motor_state + motor_state_from_assoc # Combine pooled inputs
# 3. Motor Cortex (Self + Cross-Sensory + Cross-Association)
for i in range(self.config.num_motor_cortex_layers):
if output_hidden_states:
all_hidden_states += (motor_state,)
self_attn_layer = self.motor_self_attn_layers[i]
if self.gradient_checkpointing and self.training:
outputs_self = torch.utils.checkpoint.checkpoint(
self_attn_layer,
motor_state,
position_embeddings,
causal_mask,
position_ids,
None,
next_decoder_cache,
cache_position,
output_attentions,
use_cache,
(hx, cx),
)
else:
outputs_self = self_attn_layer(
motor_state,
position_embeddings,
causal_mask,
position_ids,
kv_states=None,
past_key_value=next_decoder_cache,
cache_position=cache_position,
output_attentions=output_attentions,
use_cache=use_cache,
previous_states=(hx, cx),
)
motor_state = outputs_self[0]
hx, cx = outputs_self[-1 if not use_cache else -2]
if output_attentions:
all_attentions += (outputs_self[1],)
if use_cache:
next_decoder_cache = outputs_self[-1]
else:
next_decoder_cache = None
cross_sensory_layer = self.motor_cross_sensory_layers[i]
motor_cross_sensory_mask = None
if causal_mask is not None:
motor_q_seq_len = motor_state.size(1)
sensory_kv_seq_len = sensory_state.size(
1
)
motor_cross_sensory_mask = torch.ones(
(batch_size, 1, motor_q_seq_len, sensory_kv_seq_len),
device=hidden_states.device,
dtype=hidden_states.dtype,
)
causal_mask_upper = torch.triu(
torch.ones(
(motor_q_seq_len, sensory_kv_seq_len),
device=hidden_states.device,
),
diagonal=1,
)
motor_cross_sensory_mask = motor_cross_sensory_mask.masked_fill(
causal_mask_upper.unsqueeze(0).unsqueeze(0).bool(),
torch.finfo(hidden_states.dtype).min,
)
if self.gradient_checkpointing and self.training:
outputs_cross_sensory = torch.utils.checkpoint.checkpoint(
cross_sensory_layer,
motor_state,
position_embeddings,
motor_cross_sensory_mask,
position_ids,
sensory_state,
next_decoder_cache,
cache_position,
output_attentions,
use_cache,
(hx, cx),
)
else:
outputs_cross_sensory = cross_sensory_layer(
motor_state,
position_embeddings,
attention_mask=motor_cross_sensory_mask,
position_ids=position_ids,
kv_states=apply_gradient_scaling(
sensory_state,
self.config.cross_attention_gradient_scale,
self.config.gradient_scaling_enabled,
),
output_attentions=output_attentions,
past_key_value=next_decoder_cache,
cache_position=cache_position,
use_cache=use_cache,
previous_states=(hx, cx),
)
motor_state = motor_state + self.motor_cross_sensory_gate(
outputs_cross_sensory[0]
)
hx, cx = outputs_cross_sensory[-1 if not use_cache else -2]
if output_attentions:
all_attentions += (outputs_cross_sensory[1],)
if use_cache:
next_decoder_cache = outputs_cross_sensory[-1]
else:
next_decoder_cache = None
cross_assoc_layer = self.motor_cross_assoc_layers[i]
motor_cross_assoc_mask = None
if causal_mask is not None:
motor_q_seq_len = motor_state.size(1)
assoc_kv_seq_len = assoc_output.size(
1
)
motor_cross_assoc_mask = torch.ones(
(batch_size, 1, motor_q_seq_len, assoc_kv_seq_len),
device=hidden_states.device,
dtype=hidden_states.dtype,
)
causal_mask_upper = torch.triu(
torch.ones(
(motor_q_seq_len, assoc_kv_seq_len), device=hidden_states.device
),
diagonal=1,
)
motor_cross_assoc_mask = motor_cross_assoc_mask.masked_fill(
causal_mask_upper.unsqueeze(0).unsqueeze(0).bool(),
torch.finfo(hidden_states.dtype).min,
)
if self.gradient_checkpointing and self.training:
outputs_cross_assoc = torch.utils.checkpoint.checkpoint(
cross_assoc_layer,
motor_state,
position_embeddings,
motor_cross_assoc_mask,
position_ids,
assoc_output,
next_decoder_cache,
cache_position,
output_attentions,
use_cache,
(hx, cx),
)
else:
outputs_cross_assoc = cross_assoc_layer(
motor_state,
position_embeddings,
attention_mask=motor_cross_assoc_mask,
position_ids=position_ids,
kv_states=apply_gradient_scaling(
assoc_output,
self.config.cross_attention_gradient_scale,
self.config.gradient_scaling_enabled,
),
output_attentions=output_attentions,
past_key_value=next_decoder_cache,
cache_position=cache_position,
use_cache=use_cache,
previous_states=(hx, cx),
)
motor_state = motor_state + self.motor_cross_assoc_gate(
outputs_cross_assoc[0]
)
hx, cx = outputs_cross_assoc[-1 if not use_cache else -2]
if output_attentions:
all_attentions += (outputs_cross_assoc[1],)
if use_cache:
next_decoder_cache = outputs_cross_assoc[-1]
else:
next_decoder_cache = None
final_output = self.norm(motor_state)
if output_hidden_states:
all_hidden_states += (final_output,)
if not return_dict:
outputs_tuple = (final_output,)
if use_cache:
outputs_tuple += (next_decoder_cache,)
if output_hidden_states:
outputs_tuple += (all_hidden_states,)
if output_attentions:
outputs_tuple += (all_attentions,)
return tuple(v for v in outputs_tuple if v is not None)
return BaseModelOutputWithPast(
last_hidden_state=final_output,
past_key_values=next_decoder_cache,
hidden_states=all_hidden_states,
attentions=all_attentions,
)
def get_input_embeddings(self):
return self.embed_tokens
def set_input_embeddings(self, value):
self.embed_tokens = value
def _update_causal_mask(
self,
attention_mask: torch.Tensor,
input_tensor: torch.Tensor,
cache_position: torch.Tensor,
past_key_values: Cache,
output_attentions: bool,
):
if self.config._attn_implementation == "flash_attention_2":
if attention_mask is not None and 0.0 in attention_mask:
return attention_mask
return None
past_seen_tokens = (
past_key_values.get_seq_length() if past_key_values is not None else 0
)
using_static_cache = isinstance(past_key_values, StaticCache)
if (
self.config._attn_implementation == "sdpa"
and not using_static_cache
and not output_attentions
):
if AttentionMaskConverter._ignore_causal_mask_sdpa(
attention_mask,
inputs_embeds=input_tensor,
past_key_values_length=past_seen_tokens,
is_training=self.training,
):
return None
dtype, device = input_tensor.dtype, input_tensor.device
min_dtype = torch.finfo(dtype).min
sequence_length = input_tensor.shape[1]
if using_static_cache:
target_length = (
getattr(
past_key_values,
"get_max_length",
lambda: past_key_values.get_seq_length(),
)()
if hasattr(past_key_values, "get_seq_length")
else sequence_length + past_seen_tokens
)
else:
target_length = (
attention_mask.shape[-1]
if isinstance(attention_mask, torch.Tensor)
else past_seen_tokens + sequence_length + 1
)
causal_mask = _prepare_4d_causal_attention_mask_with_cache_position(
attention_mask,
sequence_length=sequence_length,
target_length=target_length,
dtype=dtype,
device=device,
min_dtype=min_dtype,
cache_position=cache_position,
batch_size=input_tensor.shape[0],
)
if (
self.config._attn_implementation == "sdpa"
and attention_mask is not None
and attention_mask.device.type == "cuda"
and not output_attentions
):
causal_mask = AttentionMaskConverter._unmask_unattended(
causal_mask, min_dtype
)
return causal_mask
class NeuroBLASTForCausalLM(NeuroBLASTPreTrainedModel, GenerationMixin):
_tied_weights_keys = ["lm_head.weight"]
def __init__(self, config: NeuroBLASTConfig):
super().__init__(config)
self.config = config
self.model = NeuroBLASTModel(config)
self.vocab_size = config.vocab_size # Ensure vocab_size is accessible
self.lm_head = torch.nn.Linear(config.hidden_size, self.vocab_size, bias=False)
self.loss_steps = 0
self.post_init() # Initialize weights
def get_input_embeddings(self):
return self.model.get_input_embeddings()
def set_input_embeddings(self, value):
self.model.set_input_embeddings(value)
def get_output_embeddings(self):
return self.lm_head
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
def tie_weights(self):
if getattr(self.config, "tie_word_embeddings", False):
output_embeddings = self.get_output_embeddings()
input_embeddings = self.get_input_embeddings()
output_embeddings.weight = input_embeddings.weight
if getattr(output_embeddings, "bias", None) is not None:
output_embeddings.bias.data.zero_()
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
**loss_kwargs,
) -> Union[Tuple, CausalLMOutputWithPast]:
return_dict = (
return_dict if return_dict is not None else self.config.use_return_dict
)
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
cache_position=cache_position,
)
hidden_states = outputs[0]
logits = self.lm_head(hidden_states)
logits = logits.float()
loss = None
if labels is not None:
loss = self.loss_function(
logits=logits,
labels=labels,
vocab_size=self.config.vocab_size,
**loss_kwargs,
)
if not return_dict:
output = (logits,) + outputs[1:]
return (loss,) + output if loss is not None else output
return CausalLMOutputWithPast(
loss=loss,
logits=logits,
past_key_values=outputs.past_key_values,
hidden_states=hidden_states,
attentions=outputs.attentions,
)
def get_input_embeddings(self):
return self.model.get_input_embeddings()
def set_input_embeddings(self, value):
self.model.set_input_embeddings(value)
def __str__(self):
return f"NeuroBLASTForCausalLM(config={self.config})"