import json import torch from transformers import AutoTokenizer, AutoModelForCausalLM from pathlib import Path from huggingface_hub import HfApi, create_repo # NOTE: If you are trying to reproduce this result, you will need to do the following first: # Go to the tokenizer_config.json and tokenizer.json for Qwen/Qwen3-4B in .cache/huggingface/hub/models--Qwen--Qwen3-4B/ # Remove the and tokens in those files # Else the tokenizer might tokenize the thinking tokens as and , which are special tokens def remove_thinking_tokens_and_predict(model_path, test_messages=None): """ Remove thinking tokens from model embeddings and run predictions with chat template. Consist of 3 steps: 1. Go into tokenizer_config.json and tokenizer.json and remove the thinking tokens a) vocab.json and merges.txt do not contain any mention of the thinking tokens, so no need to edit those 2. Create a new embedding layer with truncated vocabulary a) Remove the thinking tokens from the embedding layer b) This is easily done by truncating the embedding layer to the minimum thinking token ID - 1 c) Since thinking tokens are the last tokens in the vocabulary, there is no adverse effect on the model's performance 3. Run predictions with chat template """ # 1. Load model and tokenizer print("Loading model and tokenizer...") model = AutoModelForCausalLM.from_pretrained(model_path, device_map="auto", torch_dtype=torch.bfloat16) tokenizer = AutoTokenizer.from_pretrained(model_path) # 2. Get thinking token IDs thinking_tokens = ["\n\n", "\n\n"] thinking_token_ids = [151667, 151668] pre_len = len(thinking_token_ids) vocab = tokenizer.get_vocab() for token in thinking_tokens: if token in vocab: token_id = vocab[token] thinking_token_ids.append(token_id) print(f"Found {token} with ID {token_id}") print(f"Found {len(thinking_token_ids) - pre_len} additional thinking tokens") if not thinking_token_ids: print("No thinking tokens found!") else: # 3. Create new embedding layer with truncated vocabulary print("Creating new embedding layer with truncated vocabulary...") embedding_layer = None if hasattr(model, 'embed_tokens'): embedding_layer = model.embed_tokens elif hasattr(model.model, 'embed_tokens'): embedding_layer = model.model.embed_tokens if embedding_layer: # Find the minimum thinking token ID to determine truncation point # This works because the thinking tokens are the last positions in the embedding layer # All embedding positions after that are blank (padded for more efficiency during training) min_thinking_id = min(thinking_token_ids) original_vocab_size = embedding_layer.weight.shape[0] tokenizer_vocab_size = len(tokenizer.get_vocab()) new_vocab_size = min_thinking_id print(f"Original embedding size: {original_vocab_size}") print(f"Tokenizer vocab size: {tokenizer_vocab_size}") print(f"Difference: {original_vocab_size - tokenizer_vocab_size}") print(f"New vocab size: {new_vocab_size}") print(f"Removing {original_vocab_size - new_vocab_size} tokens") # Check what tokens exist beyond the tokenizer vocab vocab = tokenizer.get_vocab() max_tokenizer_id = max(vocab.values()) if vocab else 0 print(f"Max token ID in tokenizer: {max_tokenizer_id}") if original_vocab_size > max_tokenizer_id + 1: print(f"There are {original_vocab_size - max_tokenizer_id - 1} embedding slots beyond the tokenizer vocab") print("These might be:") print("- Padding for computational efficiency") print("- Reserved slots for future tokens") print("- Unused embeddings from model training") # Let's check if we can decode the tokens beyond the tokenizer vocab print("\nTrying to decode tokens beyond tokenizer vocab:") for token_id in range(max_tokenizer_id + 1, min(max_tokenizer_id + 10, original_vocab_size)): try: decoded = tokenizer.decode([token_id]) print(f"Token {token_id}: '{decoded}'") except Exception as e: print(f"Token {token_id}: Cannot decode ({e})") # Create new embedding layer with truncated vocabulary embedding_dim = embedding_layer.weight.shape[1] new_embedding = torch.nn.Embedding(new_vocab_size, embedding_dim) # Copy weights for the tokens we're keeping with torch.no_grad(): new_embedding.weight.data = embedding_layer.weight[:new_vocab_size].clone() # Replace the embedding layer in the model if hasattr(model, 'embed_tokens'): model.embed_tokens = new_embedding print("Replaced model.embed_tokens") print(f"New embedding layer shape: {model.embed_tokens.weight.shape}") elif hasattr(model.model, 'embed_tokens'): model.model.embed_tokens = new_embedding print("Replaced model.model.embed_tokens") print(f"New embedding layer shape: {model.model.embed_tokens.weight.shape}") # Also update the output layer if it exists (for language modeling head) if hasattr(model, 'lm_head') and model.lm_head.weight.shape[0] == original_vocab_size: print("Updating language modeling head...") new_lm_head = torch.nn.Linear(model.lm_head.in_features, new_vocab_size, bias=model.lm_head.bias is not None) with torch.no_grad(): new_lm_head.weight.data = model.lm_head.weight[:new_vocab_size].clone() if model.lm_head.bias is not None: new_lm_head.bias.data = model.lm_head.bias[:new_vocab_size].clone() model.lm_head = new_lm_head print("Updated lm_head") print(f"New lm_head shape: {model.lm_head.weight.shape}") else: if hasattr(model, 'lm_head'): print(f"LM head not updated - original shape: {model.lm_head.weight.shape}") else: print("No lm_head found in model") # Update model config if it exists if hasattr(model, 'config') and hasattr(model.config, 'vocab_size'): model.config.vocab_size = new_vocab_size print(f"Updated model config vocab_size to {new_vocab_size}") print(f"āœ… Successfully created new embedding layer!") print(f"Vocabulary truncated from {original_vocab_size} to {new_vocab_size} tokens") print("Note: Since you've manually updated the JSON files, the tokenizer will automatically use the new vocab.") else: print("Could not find embedding layer to modify") # 4. Verification - check how thinking tokens are now encoded print("\nšŸ” Verification:") vocab = tokenizer.get_vocab() print(f"Vocabulary size: {len(vocab)}") # Find and decode the last token (highest ID) max_token_id = max(vocab.values()) last_token = tokenizer.decode([max_token_id]) print(f"Last token (ID {max_token_id}): '{last_token}'") for token in thinking_tokens: encoded = tokenizer.encode(token, add_special_tokens=False) print(f"'{token}' encodes as: {encoded}") # 4.5. Display chat template information print("\nšŸ“‹ Chat Template Information:") if hasattr(tokenizer, 'chat_template') and tokenizer.chat_template: print("Chat template found:") print("-" * 50) print(tokenizer.chat_template) print("-" * 50) # 5. Run predictions with chat template print("\nšŸ¤– Running predictions...") # Default test messages if none provided if test_messages is None: test_messages = [ [{"role": "user", "content": "What is 2+2? Think step by step."}], [{"role": "user", "content": "Explain quantum computing in simple terms."}], [{"role": "user", "content": "Write a short poem about AI."}] ] model.eval() print(test_messages) for i, messages in enumerate(test_messages): print(f"\n--- Test {i+1} ---") print(messages) print(f"Input: {messages[-1]['content']}") # Apply chat template if hasattr(tokenizer, 'apply_chat_template'): formatted_input = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking = True ) print(f"Formatted input: {formatted_input}...") # Tokenize inputs = tokenizer(formatted_input, return_tensors="pt") print(inputs) # Generate with torch.no_grad(): inputs.to('cuda') model.to('cuda') print(f"Running inference on device: {next(model.parameters()).device}") outputs = model.generate( **inputs, max_new_tokens=200, do_sample=True, temperature=0.7, pad_token_id=tokenizer.eos_token_id ) # Decode response response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[-1]:], skip_special_tokens=True) print(f"Response: {response}") # Check if thinking tokens appear in the response has_thinking = any(token in response for token in thinking_tokens) print(f"Contains thinking tokens: {has_thinking}") else: print("No chat template available for this tokenizer") print("\nāœ… Prediction testing completed!") def save_model_to_hf( model_path: str, output_path: str, *, tokenizer_path: str | None = None, truncate_thinking_tokens: bool = True, push_to_hub: bool = True, private: bool = False, commit_message: str = "Upload model", token: str | None = None, ): """ Save the (optionally stripped) model and tokenizer either locally or directly to the Hugging Face Hub. Parameters ---------- model_path : str Path or Repo ID from which to load the model. output_path : str Local directory to save to OR a repo id like ``username/model_name`` to push to the Hub. tokenizer_path : str | None Path to a tokenizer directory. If provided, the tokenizer from this directory will be used when saving/pushing. truncate_thinking_tokens : bool, default=True Whether to remove thinking tokens from the model embeddings before saving. push_to_hub : bool, default=True When True, `output_path` is interpreted as a Hub repo id and we push the artefacts. When False, we always save to a local directory at `output_path`. private : bool, default=False If pushing, create the repository as private. commit_message : str, default="Upload model" Commit message to use when pushing. token : str | None A Hugging Face access token. If ``None`` we fall back to the one stored by ``huggingface-cli login``. """ print(f"šŸ”„ Loading model from {model_path}...") model = AutoModelForCausalLM.from_pretrained(model_path, device_map="auto", torch_dtype=torch.bfloat16) # Prefer tokenizer from `tokenizer_path` if provided (e.g. stripped of tokens) tokenizer_source = tokenizer_path if tokenizer_path is not None else model_path tokenizer = AutoTokenizer.from_pretrained(tokenizer_source) if tokenizer_path is not None: print(f"šŸ—ƒļø Loaded tokenizer from {tokenizer_path} (will be used when saving/pushing).") # Apply thinking token truncation if requested if truncate_thinking_tokens: print("šŸ”Ŗ Truncating thinking tokens from embeddings...") thinking_token_ids = [151667, 151668] embedding_layer = None if hasattr(model, 'embed_tokens'): embedding_layer = model.embed_tokens elif hasattr(model.model, 'embed_tokens'): embedding_layer = model.model.embed_tokens if embedding_layer and thinking_token_ids: min_thinking_id = min(thinking_token_ids) original_vocab_size = embedding_layer.weight.shape[0] new_vocab_size = min_thinking_id print(f"Truncating vocabulary from {original_vocab_size} to {new_vocab_size} tokens") # Create new embedding layer with truncated vocabulary embedding_dim = embedding_layer.weight.shape[1] new_embedding = torch.nn.Embedding(new_vocab_size, embedding_dim) # Copy weights for the tokens we're keeping with torch.no_grad(): new_embedding.weight.data = embedding_layer.weight[:new_vocab_size].clone() # Replace the embedding layer in the model if hasattr(model, 'embed_tokens'): model.embed_tokens = new_embedding elif hasattr(model.model, 'embed_tokens'): model.model.embed_tokens = new_embedding # Also update the output layer if it exists (for language modeling head) if hasattr(model, 'lm_head') and model.lm_head.weight.shape[0] == original_vocab_size: new_lm_head = torch.nn.Linear(model.lm_head.in_features, new_vocab_size, bias=model.lm_head.bias is not None) with torch.no_grad(): new_lm_head.weight.data = model.lm_head.weight[:new_vocab_size].clone() if model.lm_head.bias is not None: new_lm_head.bias.data = model.lm_head.bias[:new_vocab_size].clone() model.lm_head = new_lm_head # Update model config if it exists if hasattr(model, 'config') and hasattr(model.config, 'vocab_size'): model.config.vocab_size = new_vocab_size print(f"āœ… Successfully truncated embeddings to {new_vocab_size} tokens") if push_to_hub: repo_id = output_path print(f"🌐 Pushing model to Hugging Face Hub at {repo_id} ...") # Ensure repository exists (this is idempotent) api = HfApi(token=token) create_repo(repo_id, private=private, exist_ok=True, token=token) # Push model and tokenizer model.push_to_hub(repo_id, commit_message=commit_message, token=token) tokenizer.push_to_hub(repo_id, commit_message=commit_message, token=token) print(f"āœ… Successfully pushed to https://huggingface.co/{repo_id}") else: save_dir = Path(output_path) save_dir.mkdir(parents=True, exist_ok=True) print(f"šŸ’¾ Saving model locally to {save_dir.resolve()} ...") model.save_pretrained(save_dir) tokenizer.save_pretrained(save_dir) print("āœ… Model saved locally.") # Usage if __name__ == "__main__": model_path = "Qwen/Qwen3-4B" # or your local path # You can also provide custom test messages custom_messages = [ [{"role": "user", "content": "Solve this math problem: If a train travels 60 mph for 2 hours, how far does it go?"}], [{"role": "user", "content": "What are the benefits of renewable energy?"}] ] remove_thinking_tokens_and_predict(model_path, custom_messages) #save_model_to_hf(model_path, "Qwen3-NoThinkEmbed", tokenizer_path="NoThinkQwen3", push_to_hub=True)