import json
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from pathlib import Path
from huggingface_hub import HfApi, create_repo
# NOTE: If you are trying to reproduce this result, you will need to do the following first:
# Go to the tokenizer_config.json and tokenizer.json for Qwen/Qwen3-4B in .cache/huggingface/hub/models--Qwen--Qwen3-4B/
# Remove the and tokens in those files
# Else the tokenizer might tokenize the thinking tokens as and , which are special tokens
def remove_thinking_tokens_and_predict(model_path, test_messages=None):
"""
Remove thinking tokens from model embeddings and run predictions with chat template.
Consist of 3 steps:
1. Go into tokenizer_config.json and tokenizer.json and remove the thinking tokens
a) vocab.json and merges.txt do not contain any mention of the thinking tokens, so no need to edit those
2. Create a new embedding layer with truncated vocabulary
a) Remove the thinking tokens from the embedding layer
b) This is easily done by truncating the embedding layer to the minimum thinking token ID - 1
c) Since thinking tokens are the last tokens in the vocabulary, there is no adverse effect on the model's performance
3. Run predictions with chat template
"""
# 1. Load model and tokenizer
print("Loading model and tokenizer...")
model = AutoModelForCausalLM.from_pretrained(model_path, device_map="auto", torch_dtype=torch.bfloat16)
tokenizer = AutoTokenizer.from_pretrained(model_path)
# 2. Get thinking token IDs
thinking_tokens = ["\n\n", "\n\n"]
thinking_token_ids = [151667, 151668]
pre_len = len(thinking_token_ids)
vocab = tokenizer.get_vocab()
for token in thinking_tokens:
if token in vocab:
token_id = vocab[token]
thinking_token_ids.append(token_id)
print(f"Found {token} with ID {token_id}")
print(f"Found {len(thinking_token_ids) - pre_len} additional thinking tokens")
if not thinking_token_ids:
print("No thinking tokens found!")
else:
# 3. Create new embedding layer with truncated vocabulary
print("Creating new embedding layer with truncated vocabulary...")
embedding_layer = None
if hasattr(model, 'embed_tokens'):
embedding_layer = model.embed_tokens
elif hasattr(model.model, 'embed_tokens'):
embedding_layer = model.model.embed_tokens
if embedding_layer:
# Find the minimum thinking token ID to determine truncation point
# This works because the thinking tokens are the last positions in the embedding layer
# All embedding positions after that are blank (padded for more efficiency during training)
min_thinking_id = min(thinking_token_ids)
original_vocab_size = embedding_layer.weight.shape[0]
tokenizer_vocab_size = len(tokenizer.get_vocab())
new_vocab_size = min_thinking_id
print(f"Original embedding size: {original_vocab_size}")
print(f"Tokenizer vocab size: {tokenizer_vocab_size}")
print(f"Difference: {original_vocab_size - tokenizer_vocab_size}")
print(f"New vocab size: {new_vocab_size}")
print(f"Removing {original_vocab_size - new_vocab_size} tokens")
# Check what tokens exist beyond the tokenizer vocab
vocab = tokenizer.get_vocab()
max_tokenizer_id = max(vocab.values()) if vocab else 0
print(f"Max token ID in tokenizer: {max_tokenizer_id}")
if original_vocab_size > max_tokenizer_id + 1:
print(f"There are {original_vocab_size - max_tokenizer_id - 1} embedding slots beyond the tokenizer vocab")
print("These might be:")
print("- Padding for computational efficiency")
print("- Reserved slots for future tokens")
print("- Unused embeddings from model training")
# Let's check if we can decode the tokens beyond the tokenizer vocab
print("\nTrying to decode tokens beyond tokenizer vocab:")
for token_id in range(max_tokenizer_id + 1, min(max_tokenizer_id + 10, original_vocab_size)):
try:
decoded = tokenizer.decode([token_id])
print(f"Token {token_id}: '{decoded}'")
except Exception as e:
print(f"Token {token_id}: Cannot decode ({e})")
# Create new embedding layer with truncated vocabulary
embedding_dim = embedding_layer.weight.shape[1]
new_embedding = torch.nn.Embedding(new_vocab_size, embedding_dim)
# Copy weights for the tokens we're keeping
with torch.no_grad():
new_embedding.weight.data = embedding_layer.weight[:new_vocab_size].clone()
# Replace the embedding layer in the model
if hasattr(model, 'embed_tokens'):
model.embed_tokens = new_embedding
print("Replaced model.embed_tokens")
print(f"New embedding layer shape: {model.embed_tokens.weight.shape}")
elif hasattr(model.model, 'embed_tokens'):
model.model.embed_tokens = new_embedding
print("Replaced model.model.embed_tokens")
print(f"New embedding layer shape: {model.model.embed_tokens.weight.shape}")
# Also update the output layer if it exists (for language modeling head)
if hasattr(model, 'lm_head') and model.lm_head.weight.shape[0] == original_vocab_size:
print("Updating language modeling head...")
new_lm_head = torch.nn.Linear(model.lm_head.in_features, new_vocab_size, bias=model.lm_head.bias is not None)
with torch.no_grad():
new_lm_head.weight.data = model.lm_head.weight[:new_vocab_size].clone()
if model.lm_head.bias is not None:
new_lm_head.bias.data = model.lm_head.bias[:new_vocab_size].clone()
model.lm_head = new_lm_head
print("Updated lm_head")
print(f"New lm_head shape: {model.lm_head.weight.shape}")
else:
if hasattr(model, 'lm_head'):
print(f"LM head not updated - original shape: {model.lm_head.weight.shape}")
else:
print("No lm_head found in model")
# Update model config if it exists
if hasattr(model, 'config') and hasattr(model.config, 'vocab_size'):
model.config.vocab_size = new_vocab_size
print(f"Updated model config vocab_size to {new_vocab_size}")
print(f"ā
Successfully created new embedding layer!")
print(f"Vocabulary truncated from {original_vocab_size} to {new_vocab_size} tokens")
print("Note: Since you've manually updated the JSON files, the tokenizer will automatically use the new vocab.")
else:
print("Could not find embedding layer to modify")
# 4. Verification - check how thinking tokens are now encoded
print("\nš Verification:")
vocab = tokenizer.get_vocab()
print(f"Vocabulary size: {len(vocab)}")
# Find and decode the last token (highest ID)
max_token_id = max(vocab.values())
last_token = tokenizer.decode([max_token_id])
print(f"Last token (ID {max_token_id}): '{last_token}'")
for token in thinking_tokens:
encoded = tokenizer.encode(token, add_special_tokens=False)
print(f"'{token}' encodes as: {encoded}")
# 4.5. Display chat template information
print("\nš Chat Template Information:")
if hasattr(tokenizer, 'chat_template') and tokenizer.chat_template:
print("Chat template found:")
print("-" * 50)
print(tokenizer.chat_template)
print("-" * 50)
# 5. Run predictions with chat template
print("\nš¤ Running predictions...")
# Default test messages if none provided
if test_messages is None:
test_messages = [
[{"role": "user", "content": "What is 2+2? Think step by step."}],
[{"role": "user", "content": "Explain quantum computing in simple terms."}],
[{"role": "user", "content": "Write a short poem about AI."}]
]
model.eval()
print(test_messages)
for i, messages in enumerate(test_messages):
print(f"\n--- Test {i+1} ---")
print(messages)
print(f"Input: {messages[-1]['content']}")
# Apply chat template
if hasattr(tokenizer, 'apply_chat_template'):
formatted_input = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking = True
)
print(f"Formatted input: {formatted_input}...")
# Tokenize
inputs = tokenizer(formatted_input, return_tensors="pt")
print(inputs)
# Generate
with torch.no_grad():
inputs.to('cuda')
model.to('cuda')
print(f"Running inference on device: {next(model.parameters()).device}")
outputs = model.generate(
**inputs,
max_new_tokens=200,
do_sample=True,
temperature=0.7,
pad_token_id=tokenizer.eos_token_id
)
# Decode response
response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[-1]:], skip_special_tokens=True)
print(f"Response: {response}")
# Check if thinking tokens appear in the response
has_thinking = any(token in response for token in thinking_tokens)
print(f"Contains thinking tokens: {has_thinking}")
else:
print("No chat template available for this tokenizer")
print("\nā
Prediction testing completed!")
def save_model_to_hf(
model_path: str,
output_path: str,
*,
tokenizer_path: str | None = None,
truncate_thinking_tokens: bool = True,
push_to_hub: bool = True,
private: bool = False,
commit_message: str = "Upload model",
token: str | None = None,
):
"""
Save the (optionally stripped) model and tokenizer either locally or directly to the Hugging Face Hub.
Parameters
----------
model_path : str
Path or Repo ID from which to load the model.
output_path : str
Local directory to save to OR a repo id like ``username/model_name`` to push to the Hub.
tokenizer_path : str | None
Path to a tokenizer directory. If provided, the tokenizer from this directory will be used when saving/pushing.
truncate_thinking_tokens : bool, default=True
Whether to remove thinking tokens from the model embeddings before saving.
push_to_hub : bool, default=True
When True, `output_path` is interpreted as a Hub repo id and we push the artefacts.
When False, we always save to a local directory at `output_path`.
private : bool, default=False
If pushing, create the repository as private.
commit_message : str, default="Upload model"
Commit message to use when pushing.
token : str | None
A Hugging Face access token. If ``None`` we fall back to the one stored by ``huggingface-cli login``.
"""
print(f"š Loading model from {model_path}...")
model = AutoModelForCausalLM.from_pretrained(model_path, device_map="auto", torch_dtype=torch.bfloat16)
# Prefer tokenizer from `tokenizer_path` if provided (e.g. stripped of tokens)
tokenizer_source = tokenizer_path if tokenizer_path is not None else model_path
tokenizer = AutoTokenizer.from_pretrained(tokenizer_source)
if tokenizer_path is not None:
print(f"šļø Loaded tokenizer from {tokenizer_path} (will be used when saving/pushing).")
# Apply thinking token truncation if requested
if truncate_thinking_tokens:
print("šŖ Truncating thinking tokens from embeddings...")
thinking_token_ids = [151667, 151668]
embedding_layer = None
if hasattr(model, 'embed_tokens'):
embedding_layer = model.embed_tokens
elif hasattr(model.model, 'embed_tokens'):
embedding_layer = model.model.embed_tokens
if embedding_layer and thinking_token_ids:
min_thinking_id = min(thinking_token_ids)
original_vocab_size = embedding_layer.weight.shape[0]
new_vocab_size = min_thinking_id
print(f"Truncating vocabulary from {original_vocab_size} to {new_vocab_size} tokens")
# Create new embedding layer with truncated vocabulary
embedding_dim = embedding_layer.weight.shape[1]
new_embedding = torch.nn.Embedding(new_vocab_size, embedding_dim)
# Copy weights for the tokens we're keeping
with torch.no_grad():
new_embedding.weight.data = embedding_layer.weight[:new_vocab_size].clone()
# Replace the embedding layer in the model
if hasattr(model, 'embed_tokens'):
model.embed_tokens = new_embedding
elif hasattr(model.model, 'embed_tokens'):
model.model.embed_tokens = new_embedding
# Also update the output layer if it exists (for language modeling head)
if hasattr(model, 'lm_head') and model.lm_head.weight.shape[0] == original_vocab_size:
new_lm_head = torch.nn.Linear(model.lm_head.in_features, new_vocab_size, bias=model.lm_head.bias is not None)
with torch.no_grad():
new_lm_head.weight.data = model.lm_head.weight[:new_vocab_size].clone()
if model.lm_head.bias is not None:
new_lm_head.bias.data = model.lm_head.bias[:new_vocab_size].clone()
model.lm_head = new_lm_head
# Update model config if it exists
if hasattr(model, 'config') and hasattr(model.config, 'vocab_size'):
model.config.vocab_size = new_vocab_size
print(f"ā
Successfully truncated embeddings to {new_vocab_size} tokens")
if push_to_hub:
repo_id = output_path
print(f"š Pushing model to Hugging Face Hub at {repo_id} ...")
# Ensure repository exists (this is idempotent)
api = HfApi(token=token)
create_repo(repo_id, private=private, exist_ok=True, token=token)
# Push model and tokenizer
model.push_to_hub(repo_id, commit_message=commit_message, token=token)
tokenizer.push_to_hub(repo_id, commit_message=commit_message, token=token)
print(f"ā
Successfully pushed to https://huggingface.co/{repo_id}")
else:
save_dir = Path(output_path)
save_dir.mkdir(parents=True, exist_ok=True)
print(f"š¾ Saving model locally to {save_dir.resolve()} ...")
model.save_pretrained(save_dir)
tokenizer.save_pretrained(save_dir)
print("ā
Model saved locally.")
# Usage
if __name__ == "__main__":
model_path = "Qwen/Qwen3-4B" # or your local path
# You can also provide custom test messages
custom_messages = [
[{"role": "user", "content": "Solve this math problem: If a train travels 60 mph for 2 hours, how far does it go?"}],
[{"role": "user", "content": "What are the benefits of renewable energy?"}]
]
remove_thinking_tokens_and_predict(model_path, custom_messages)
#save_model_to_hf(model_path, "Qwen3-NoThinkEmbed", tokenizer_path="NoThinkQwen3", push_to_hub=True)