translit-demo / train.py
SmitaGautam's picture
Upload 5 files
65573e5 verified
import json, os, zipfile, io, tempfile, requests
from datasets import load_dataset
import sys
import random
from collections import defaultdict, Counter
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import numpy as np
from datasets import load_dataset
from tqdm import tqdm
import collections
# sys.stdout = open("logs/train.txt", "w", encoding="utf-8")
# sys.stderr = sys.stdout
# 2️⃣ Clean and standardize JSON files - PRESERVE ALL FIELDS
def clean_json_file(infile, outfile):
data = []
with open(infile, "r", encoding="utf-8") as f:
for line in f:
try:
obj = json.loads(line.strip())
eng = obj.get("english word") or obj.get("english_word")
native = obj.get("native word") or obj.get("native_word")
# Strip whitespace and filter out empty strings
if eng:
eng = eng.strip()
if native:
native = native.strip()
# Only keep if both fields are non-empty after stripping
if eng and native:
# Preserve all fields
score = obj.get("score", None)
if score is None:
score = float("nan")
cleaned_obj = {
"english word": eng,
"native word": native,
"source": obj.get("source", "Unknown"),
"score": score,
"unique_identifier": obj.get("unique_identifier", None)
}
data.append(cleaned_obj)
except Exception:
continue
with open(outfile, "w", encoding="utf-8") as f:
for entry in data:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"✅ Cleaned {len(data)} entries in {os.path.basename(infile)}")
def sample_transliteration_dataset(full_dataset, sample_size=100000, top_freq_ratio=0.3, seed=42):
"""
Sample a subset of a transliteration dataset.
Parameters:
- full_dataset: list of dicts, each with 'english word' and 'native word'
- sample_size: total number of samples to return
- top_freq_ratio: fraction of samples from top frequent words
- seed: random seed for reproducibility
Returns:
- sampled_dataset: list of dicts
"""
random.seed(seed)
# -----------------------------
# 1️⃣ Top frequent words
# -----------------------------
words = [item['english word'] for item in full_dataset]
freq = Counter(words)
sorted_items = sorted(full_dataset, key=lambda x: freq[x['english word']], reverse=True)
num_top = int(sample_size * top_freq_ratio)
top_items = sorted_items[:num_top]
# Remaining items for stratified sampling
remaining_size = sample_size - num_top
remaining_items = sorted_items[num_top:]
# -----------------------------
# 2️⃣ Stratified by word length
# -----------------------------
length_groups = defaultdict(list)
for item in remaining_items:
length_groups[len(item['english word'])].append(item)
sampled_remaining = []
total_remaining_items = sum(len(v) for v in length_groups.values())
for length, items in length_groups.items():
n = int(remaining_size * len(items) / total_remaining_items)
n = min(n, len(items))
sampled_remaining.extend(random.sample(items, n))
# -----------------------------
# 3️⃣ Combine and shuffle
# -----------------------------
sampled_dataset = top_items + sampled_remaining
if len(sampled_dataset) > sample_size:
sampled_dataset = random.sample(sampled_dataset, sample_size)
random.shuffle(sampled_dataset)
return sampled_dataset
# 4️⃣ DATA ANALYSIS FUNCTION
def analyze_dataset_statistics(dataset_split, split_name="train"):
"""Analyze dataset statistics by source and length"""
print(f"\n{'='*70}")
print(f"DATASET STATISTICS - {split_name.upper()} SPLIT")
print(f"{'='*70}\n")
data_list = list(dataset_split)
print(f"Total samples: {len(data_list):,}\n")
# Group by source
source_stats = defaultdict(lambda: {
'count': 0,
'english_lengths': [],
'native_lengths': [],
})
for item in data_list:
english_word = item.get('english word', '')
native_word = item.get('native word', '')
source = item.get('source', 'Unknown')
source_stats[source]['count'] += 1
source_stats[source]['english_lengths'].append(len(english_word))
source_stats[source]['native_lengths'].append(len(native_word))
# Compute statistics per source
stats_list = []
for source, data in sorted(source_stats.items(), key=lambda x: x[1]['count'], reverse=True):
eng_lengths = data['english_lengths']
nat_lengths = data['native_lengths']
if eng_lengths and nat_lengths:
stats_list.append({
'Source': source,
'Count': data['count'],
'Percentage': f"{100 * data['count'] / len(data_list):.2f}%",
'Eng_Min': min(eng_lengths),
'Eng_Max': max(eng_lengths),
'Eng_Mean': f"{np.mean(eng_lengths):.2f}",
'Eng_Median': f"{np.median(eng_lengths):.1f}",
'Nat_Min': min(nat_lengths),
'Nat_Max': max(nat_lengths),
'Nat_Mean': f"{np.mean(nat_lengths):.2f}",
'Nat_Median': f"{np.median(nat_lengths):.1f}",
})
stats_df = pd.DataFrame(stats_list)
print("STATISTICS BY SOURCE:")
print(stats_df.to_string(index=False))
print()
# Overall length distribution
all_eng_lengths = [len(item.get('english word', '')) for item in data_list]
all_nat_lengths = [len(item.get('native word', '')) for item in data_list]
print("OVERALL LENGTH DISTRIBUTION:")
print(f"English - Min: {min(all_eng_lengths)}, Max: {max(all_eng_lengths)}, "
f"Mean: {np.mean(all_eng_lengths):.2f}, Median: {np.median(all_eng_lengths):.1f}")
print(f"Native - Min: {min(all_nat_lengths)}, Max: {max(all_nat_lengths)}, "
f"Mean: {np.mean(all_nat_lengths):.2f}, Median: {np.median(all_nat_lengths):.1f}")
print()
# Length buckets
print("LENGTH DISTRIBUTION (English words):")
length_buckets = {
'1-3': 0, '4-6': 0, '7-10': 0, '11-15': 0, '16-20': 0, '21+': 0
}
for length in all_eng_lengths:
if length <= 3:
length_buckets['1-3'] += 1
elif length <= 6:
length_buckets['4-6'] += 1
elif length <= 10:
length_buckets['7-10'] += 1
elif length <= 15:
length_buckets['11-15'] += 1
elif length <= 20:
length_buckets['16-20'] += 1
else:
length_buckets['21+'] += 1
for bucket, count in length_buckets.items():
print(f" {bucket:6s}: {count:6,} ({100*count/len(data_list):5.2f}%)")
print(f"\n{'='*70}\n")
return stats_df
# =======================
# 1. CHARACTER-LEVEL TOKENIZER
# =======================
class CharTokenizer:
"""Character-level tokenizer for transliteration"""
def __init__(self, vocab=None):
self.pad_token = '<PAD>'
self.sos_token = '<SOS>'
self.eos_token = '<EOS>'
self.unk_token = '<UNK>'
if vocab is None:
self.char2idx = {
self.pad_token: 0,
self.sos_token: 1,
self.eos_token: 2,
self.unk_token: 3
}
self.idx2char = {v: k for k, v in self.char2idx.items()}
else:
self.char2idx = vocab
self.idx2char = {v: k for k, v in self.char2idx.items()}
def fit(self, texts):
"""Build vocabulary from texts"""
char_counts = Counter()
for text in texts:
char_counts.update(text)
# Add characters to vocabulary (sorted for consistency)
for char, _ in sorted(char_counts.items()):
if char not in self.char2idx:
self.char2idx[char] = len(self.char2idx)
self.idx2char = {v: k for k, v in self.char2idx.items()}
return self
def encode(self, text, add_special_tokens=True):
"""Convert text to indices"""
if add_special_tokens:
indices = [self.char2idx[self.sos_token]]
indices.extend([self.char2idx.get(c, self.char2idx[self.unk_token]) for c in text])
indices.append(self.char2idx[self.eos_token])
else:
indices = [self.char2idx.get(c, self.char2idx[self.unk_token]) for c in text]
return indices
def decode(self, indices, skip_special_tokens=True):
"""Convert indices back to text"""
chars = []
for idx in indices:
# handle both ints and tensors
if isinstance(idx, torch.Tensor):
idx = idx.item()
char = self.idx2char.get(idx, self.unk_token)
if skip_special_tokens and char in [self.pad_token, self.sos_token, self.eos_token]:
continue
chars.append(char)
return ''.join(chars)
def __len__(self):
return len(self.char2idx)
# =======================
# 2. DATASET CLASS
# =======================
class TransliterationDataset(Dataset):
"""Dataset for transliteration task"""
def __init__(self, data, src_tokenizer, tgt_tokenizer, max_len=50):
# data: an iterable of dict-like objects with 'english word' and 'native word'
self.data = data
self.src_tokenizer = src_tokenizer
self.tgt_tokenizer = tgt_tokenizer
self.max_len = max_len
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
item = self.data[idx]
src = item['english word']
tgt = item['native word']
# Encode
src_ids = self.src_tokenizer.encode(src)
tgt_ids = self.tgt_tokenizer.encode(tgt)
# Truncate if needed
src_ids = src_ids[:self.max_len]
tgt_ids = tgt_ids[:self.max_len]
return {
'src_ids': torch.tensor(src_ids, dtype=torch.long),
'tgt_ids': torch.tensor(tgt_ids, dtype=torch.long),
'src_text': src,
'tgt_text': tgt
}
def collate_fn(batch):
"""Custom collate function to pad sequences"""
src_ids = [item['src_ids'] for item in batch]
tgt_ids = [item['tgt_ids'] for item in batch]
# Pad sequences
src_ids = nn.utils.rnn.pad_sequence(src_ids, batch_first=True, padding_value=0)
tgt_ids = nn.utils.rnn.pad_sequence(tgt_ids, batch_first=True, padding_value=0)
return {
'src_ids': src_ids,
'tgt_ids': tgt_ids,
'src_text': [item['src_text'] for item in batch],
'tgt_text': [item['tgt_text'] for item in batch]
}
# =======================
# 3. LSTM ENCODER-DECODER WITH ATTENTION
# =======================
class Encoder(nn.Module):
"""LSTM Encoder"""
def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=2, dropout=0.3):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0,
bidirectional=True)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
embedded = self.dropout(self.embedding(x))
outputs, (hidden, cell) = self.lstm(embedded)
return outputs, hidden, cell
class Attention(nn.Module):
"""Bahdanau Attention Mechanism"""
def __init__(self, dec_hidden_dim, enc_hidden_dim):
super().__init__()
self.attn = nn.Linear(dec_hidden_dim + enc_hidden_dim*2, dec_hidden_dim)
self.v = nn.Linear(dec_hidden_dim, 1, bias=False)
def forward(self, hidden, encoder_outputs, mask=None):
batch_size = encoder_outputs.shape[0]
src_len = encoder_outputs.shape[1]
hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
attention = self.v(energy).squeeze(2)
if mask is not None:
attention = attention.masked_fill(mask == 0, -1e10)
attn_weights = torch.softmax(attention, dim=1)
context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs)
return context.squeeze(1), attn_weights.squeeze(1)
class Decoder(nn.Module):
"""LSTM Decoder with Attention"""
def __init__(self, vocab_size, embed_dim, enc_hidden_dim, dec_hidden_dim, num_layers=2, dropout=0.3):
super().__init__()
self.vocab_size = vocab_size
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
self.attention = Attention(dec_hidden_dim, enc_hidden_dim)
self.lstm = nn.LSTM(embed_dim + enc_hidden_dim*2, dec_hidden_dim, num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc_out = nn.Linear(dec_hidden_dim + enc_hidden_dim*2 + embed_dim, vocab_size)
self.dropout = nn.Dropout(dropout)
self.enc_hidden_dim = enc_hidden_dim
self.dec_hidden_dim = dec_hidden_dim
def forward(self, input, hidden, cell, encoder_outputs, mask=None):
input = input.unsqueeze(1)
embedded = self.dropout(self.embedding(input))
context, attn_weights = self.attention(hidden[-1], encoder_outputs, mask)
context = context.unsqueeze(1)
rnn_input = torch.cat((embedded, context), dim=2)
output, (hidden, cell) = self.lstm(rnn_input, (hidden, cell))
output = output.squeeze(1)
embedded = embedded.squeeze(1)
context = context.squeeze(1)
prediction = self.fc_out(torch.cat((output, context, embedded), dim=1))
return prediction, hidden, cell, attn_weights
class Seq2Seq(nn.Module):
"""Complete Sequence-to-Sequence Model"""
def beam_search_decode(self, src, src_tokenizer, tgt_tokenizer, max_len=50, beam_width=3):
"""Beam search decoding for Seq2Seq"""
self.eval()
with torch.no_grad():
if isinstance(src, str):
src_ids = src_tokenizer.encode(src)
src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(self.device)
else:
src_tensor = src.to(self.device)
encoder_outputs, hidden, cell = self.encoder(src_tensor)
num_layers = self.decoder.lstm.num_layers
hidden = hidden.view(num_layers, 2, 1, -1)
hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2)
cell = cell.view(num_layers, 2, 1, -1)
cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2)
hidden = self.hidden_projection(hidden)
cell = self.cell_projection(cell)
mask = self.create_mask(src_tensor)
start_token = tgt_tokenizer.char2idx[tgt_tokenizer.sos_token]
beams = [(torch.tensor([start_token], device=self.device), 0.0, hidden, cell)]
completed_sequences = []
for _ in range(max_len):
new_beams = []
for seq, log_prob, h, c in beams:
input_token = seq[-1].unsqueeze(0)
output, h_new, c_new, _ = self.decoder(input_token, h, c, encoder_outputs, mask)
probs = torch.log_softmax(output, dim=1).squeeze(0)
topk_probs, topk_idx = probs.topk(beam_width)
for prob, idx in zip(topk_probs, topk_idx):
new_seq = torch.cat([seq, idx.unsqueeze(0)])
new_log_prob = log_prob + prob.item()
new_beams.append((new_seq, new_log_prob, h_new, c_new))
new_beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]
beams = []
for seq, log_prob, h, c in new_beams:
if seq[-1].item() == tgt_tokenizer.char2idx[tgt_tokenizer.eos_token]:
completed_sequences.append((seq, log_prob))
else:
beams.append((seq, log_prob, h, c))
if not beams:
break
if len(completed_sequences) == 0:
completed_sequences = beams
best_seq = max(completed_sequences, key=lambda x: x[1])[0]
return tgt_tokenizer.decode(best_seq[1:], skip_special_tokens=True)
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
enc_hidden_dim = encoder.lstm.hidden_size
dec_hidden_dim = decoder.dec_hidden_dim
num_layers = decoder.lstm.num_layers
self.hidden_projection = nn.Linear(enc_hidden_dim * 2, dec_hidden_dim)
self.cell_projection = nn.Linear(enc_hidden_dim * 2, dec_hidden_dim)
def create_mask(self, src):
mask = (src != 0)
return mask
def forward(self, src, tgt, teacher_forcing_ratio=0.5):
batch_size = src.shape[0]
tgt_len = tgt.shape[1]
tgt_vocab_size = self.decoder.vocab_size
outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)
encoder_outputs, hidden, cell = self.encoder(src)
num_layers = self.decoder.lstm.num_layers
hidden = hidden.view(num_layers, 2, batch_size, -1)
cell = cell.view(num_layers, 2, batch_size, -1)
hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2)
cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2)
hidden = self.hidden_projection(hidden)
cell = self.cell_projection(cell)
mask = self.create_mask(src)
input = tgt[:, 0]
for t in range(1, tgt_len):
output, hidden, cell, attn_weights = self.decoder(
input, hidden, cell, encoder_outputs, mask
)
outputs[:, t] = output
teacher_force = torch.rand(1).item() < teacher_forcing_ratio
top1 = output.argmax(1)
input = tgt[:, t] if teacher_force else top1
return outputs
def translate(self, src, src_tokenizer, tgt_tokenizer, max_len=50):
"""Translate a single source sequence (greedy decoding)"""
self.eval()
with torch.no_grad():
if isinstance(src, str):
src_ids = src_tokenizer.encode(src)
src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(self.device)
else:
src_tensor = src.to(self.device)
encoder_outputs, hidden, cell = self.encoder(src_tensor)
num_layers = self.decoder.lstm.num_layers
hidden = hidden.view(num_layers, 2, 1, -1)
hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2)
cell = cell.view(num_layers, 2, 1, -1)
cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2)
hidden = self.hidden_projection(hidden)
cell = self.cell_projection(cell)
mask = self.create_mask(src_tensor)
input = torch.tensor([tgt_tokenizer.char2idx[tgt_tokenizer.sos_token]]).to(self.device)
outputs = []
for _ in range(max_len):
output, hidden, cell, _ = self.decoder(input, hidden, cell, encoder_outputs, mask)
top1 = output.argmax(1)
outputs.append(top1.item())
if top1.item() == tgt_tokenizer.char2idx[tgt_tokenizer.eos_token]:
break
input = top1
return tgt_tokenizer.decode(outputs, skip_special_tokens=True)
# =======================
# 4. TRAINING / EVAL HELPERS
# =======================
def train_epoch(model, dataloader, optimizer, criterion, device, clip=1.0):
model.train()
epoch_loss = 0
for batch in dataloader:
src = batch['src_ids'].to(device)
tgt = batch['tgt_ids'].to(device)
optimizer.zero_grad()
output = model(src, tgt)
output = output[:, 1:].reshape(-1, output.shape[-1])
tgt_flat = tgt[:, 1:].reshape(-1)
loss = criterion(output, tgt_flat)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(dataloader)
def char_overlap_f1(pred, true):
"""Character-level overlap F1 per word"""
pred_counts = collections.Counter(pred)
true_counts = collections.Counter(true)
overlap = sum((pred_counts & true_counts).values())
if overlap == 0:
return 0.0
precision = overlap / len(pred)
recall = overlap / len(true)
return 2 * precision * recall / (precision + recall)
def evaluate(model, dataloader, criterion, device):
model.eval()
epoch_loss = 0
with torch.no_grad():
for batch in dataloader:
src = batch['src_ids'].to(device)
tgt = batch['tgt_ids'].to(device)
# No teacher forcing
output = model(src, tgt, teacher_forcing_ratio=0.0)
output = output[:, 1:].reshape(-1, output.shape[-1])
tgt_flat = tgt[:, 1:].reshape(-1)
loss = criterion(output, tgt_flat)
epoch_loss += loss.item()
return epoch_loss / len(dataloader)
# =======================
# 5. ENHANCED EVALUATION WITH FULL DETAILS
# =======================
def evaluate_with_full_details(model, dataloader, src_tokenizer, tgt_tokenizer,
device, output_file='test_results.jsonl',
decoding="greedy", beam_width=3, max_samples=None):
"""
Evaluate model and save detailed results with all metadata
"""
model.eval()
results = []
word_correct = 0
char_correct = 0
char_total = 0
print(f"\n{'='*70}")
print(f"DETAILED EVALUATION - {decoding.upper()} DECODING")
print(f"{'='*70}\n")
with torch.no_grad():
sample_idx = 0
char_f1_list = []
for batch_idx, batch in enumerate(tqdm(dataloader, desc="Evaluating")):
src = batch['src_ids'].to(device)
src_texts = batch['src_text']
tgt_texts = batch['tgt_text']
for i in range(len(src_texts)):
src_text = src_texts[i]
tgt_text = tgt_texts[i]
src_tensor = src[i].unsqueeze(0)
# Generate prediction
if hasattr(model, 'beam_search_decode') and decoding == "beam":
pred_text = model.beam_search_decode(
src_tensor, src_tokenizer, tgt_tokenizer,
max_len=100, beam_width=beam_width
)
else:
pred_text = model.translate(
src_tensor, src_tokenizer, tgt_tokenizer,
max_len=100
)
pred_text = pred_text.strip()
tgt_text = tgt_text.strip()
src_text = src_text.strip()
# Compute metrics
is_correct = (pred_text == tgt_text)
if is_correct:
word_correct += 1
min_len = min(len(pred_text), len(tgt_text))
char_matches = sum(1 for j in range(min_len) if pred_text[j] == tgt_text[j])
char_correct += char_matches
char_total += len(tgt_text)
char_accuracy = char_matches / len(tgt_text) if len(tgt_text) > 0 else 0.0
char_f1 = char_overlap_f1(pred_text.strip(), tgt_text.strip())
char_f1_list.append(char_f1)
# Get original metadata from dataset
# try:
original_item = dataloader.dataset.data[sample_idx]
source = original_item.get('source', 'Unknown')
score = original_item.get('score', None)
unique_id = f'sample_{sample_idx}'
# Store complete result with all original fields
result = {
'unique_identifier': unique_id,
'source': source,
'score': score,
'english_word': src_text,
'native_word': tgt_text,
'predicted_word': pred_text,
'is_correct': is_correct,
'english_length': len(src_text),
'native_length': len(tgt_text),
'predicted_length': len(pred_text),
'char_accuracy': char_accuracy,
'char_f1': char_f1,
'decoding_method': decoding,
}
if decoding == "beam":
result['beam_width'] = beam_width
results.append(result)
sample_idx += 1
if max_samples and len(results) >= max_samples:
break
if max_samples and len(results) >= max_samples:
break
# Create DataFrame
results_df = pd.DataFrame(results)
# Calculate metrics
total_samples = len(results)
word_accuracy = word_correct / total_samples if total_samples > 0 else 0.0
char_accuracy = char_correct / char_total if char_total > 0 else 0.0
char_f1 = np.mean(char_f1_list) if len(char_f1_list) > 0 else 0.0
print(f"\nOVERALL METRICS:")
print(f" Total Samples: {total_samples:,}")
print(f" Word Accuracy: {word_accuracy:.4f}")
print(f" Char Accuracy: {char_accuracy:.4f}\n")
print(f" Char F1: {char_f1:.4f}\n")
# Statistics by source
if 'source' in results_df.columns and results_df['source'].nunique() > 1:
print("ACCURACY BY SOURCE:")
source_stats = results_df.groupby('source').agg({
'is_correct': ['count', 'sum', 'mean']
}).round(4)
source_stats.columns = ['Count', 'Correct', 'Accuracy']
print(source_stats.to_string())
print()
# Statistics by length
results_df['length_bucket'] = pd.cut(
results_df['english_length'],
bins=[0, 3, 6, 10, 15, 20, 100],
labels=['1-3', '4-6', '7-10', '11-15', '16-20', '21+']
)
print("ACCURACY BY LENGTH:")
length_stats = results_df.groupby('length_bucket').agg({
'is_correct': ['count', 'mean']
}).round(4)
length_stats.columns = ['Count', 'Accuracy']
print(length_stats.to_string())
print()
# Show some examples
print("SAMPLE PREDICTIONS:")
for idx in range(min(10, len(results_df))):
row = results_df.iloc[idx]
ok = "✓" if row['is_correct'] else "✗"
print(f"{ok} {row['english_word']} -> {row['predicted_word']} (expected: {row['native_word']})")
print()
# Save results
os.makedirs(os.path.dirname(output_file) if os.path.dirname(output_file) else '.', exist_ok=True)
results_df.to_json(output_file, orient='records', lines=True, force_ascii=False)
print(f"✅ Results saved to: {output_file}\n")
metrics = {
'total_samples': total_samples,
'word_correct': word_correct,
'word_accuracy': word_accuracy,
'char_accuracy': char_accuracy,
'char_f1': char_f1,
'decoding_method': decoding
}
if decoding == "beam":
metrics['beam_width'] = beam_width
return results_df, metrics
# =======================
# 10. TRANSFORMER MODEL
# =======================
import math
class PositionalEncoding(nn.Module):
"""Sinusoidal positional encoding"""
def __init__(self, d_model, max_len=5000, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:, :x.size(1), :]
return self.dropout(x)
class TransformerTransliterator(nn.Module):
def __init__(self,
src_vocab_size,
tgt_vocab_size,
d_model=256,
nhead=8,
num_encoder_layers=2,
num_decoder_layers=2,
dim_feedforward=512,
dropout=0.1,
max_len=100):
super().__init__()
self.d_model = d_model
self.src_vocab_size = src_vocab_size
self.tgt_vocab_size = tgt_vocab_size
# Embeddings
self.src_embedding = nn.Embedding(src_vocab_size, d_model, padding_idx=0)
self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model, padding_idx=0)
# Positional encoding
self.pos_encoder = PositionalEncoding(d_model, max_len, dropout)
# Transformer
self.transformer = nn.Transformer(
d_model=d_model,
nhead=nhead,
num_encoder_layers=num_encoder_layers,
num_decoder_layers=num_decoder_layers,
dim_feedforward=dim_feedforward,
dropout=dropout,
batch_first=True
)
# Output layer
self.fc_out = nn.Linear(d_model, tgt_vocab_size)
self._init_weights()
def _init_weights(self):
initrange = 0.1
self.src_embedding.weight.data.uniform_(-initrange, initrange)
self.tgt_embedding.weight.data.uniform_(-initrange, initrange)
self.fc_out.bias.data.zero_()
self.fc_out.weight.data.uniform_(-initrange, initrange)
def generate_square_subsequent_mask(self, sz):
"""Causal mask for decoder"""
mask = torch.triu(torch.ones(sz, sz), diagonal=1).bool()
mask = mask.masked_fill(mask, float('-inf'))
return mask
def create_padding_mask(self, seq, pad_idx=0):
"""Mask for padding tokens"""
return (seq == pad_idx)
def forward(self, src, tgt):
tgt_len = tgt.shape[1]
tgt_mask = self.generate_square_subsequent_mask(tgt_len).to(tgt.device)
src_padding_mask = self.create_padding_mask(src)
tgt_padding_mask = self.create_padding_mask(tgt)
src_emb = self.pos_encoder(self.src_embedding(src) * math.sqrt(self.d_model))
tgt_emb = self.pos_encoder(self.tgt_embedding(tgt) * math.sqrt(self.d_model))
output = self.transformer(
src_emb,
tgt_emb,
tgt_mask=tgt_mask,
src_key_padding_mask=src_padding_mask,
tgt_key_padding_mask=tgt_padding_mask,
memory_key_padding_mask=src_padding_mask
)
output = self.fc_out(output)
return output
def translate(self, src, src_tokenizer, tgt_tokenizer, max_len=50, device='cpu',
decoding="greedy", beam_width=3):
"""Greedy or Beam Search decoding"""
self.eval()
with torch.no_grad():
if isinstance(src, str):
src_ids = src_tokenizer.encode(src)
src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(device)
else:
src_tensor = src.to(device)
sos_idx = tgt_tokenizer.char2idx[tgt_tokenizer.sos_token]
eos_idx = tgt_tokenizer.char2idx[tgt_tokenizer.eos_token]
if decoding == "beam":
# Initialize beams: list of (sequence, score)
beams = [([sos_idx], 0.0)]
for _ in range(max_len):
new_beams = []
for seq, score in beams:
tgt_tensor = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device)
output = self.forward(src_tensor, tgt_tensor)
logits = output[0, -1, :]
probs = torch.log_softmax(logits, dim=-1)
topk_probs, topk_indices = probs.topk(beam_width)
for k in range(beam_width):
next_seq = seq + [topk_indices[k].item()]
next_score = score + topk_probs[k].item()
new_beams.append((next_seq, next_score))
# Keep top beam_width sequences
beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]
# Stop if all beams have ended
if all(seq[-1] == eos_idx for seq, _ in beams):
break
best_seq = beams[0][0]
return tgt_tokenizer.decode(best_seq, skip_special_tokens=True)
else:
# Greedy decoding
tgt_indices = [sos_idx]
for _ in range(max_len):
tgt_tensor = torch.tensor(tgt_indices, dtype=torch.long).unsqueeze(0).to(device)
output = self.forward(src_tensor, tgt_tensor)
next_token = output[0, -1, :].argmax().item()
tgt_indices.append(next_token)
if next_token == eos_idx:
break
return tgt_tokenizer.decode(tgt_indices, skip_special_tokens=True)
def train_transformer_epoch(model, dataloader, optimizer, criterion, device):
model.train()
total_loss = 0
for batch in dataloader:
src = batch['src_ids'].to(device)
tgt = batch['tgt_ids'].to(device)
tgt_input = tgt[:, :-1]
tgt_output = tgt[:, 1:]
optimizer.zero_grad()
output = model(src, tgt_input)
loss = criterion(output.reshape(-1, output.shape[-1]), tgt_output.reshape(-1))
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def evaluate_transformer(model, dataloader, criterion, device):
model.eval()
total_loss = 0
with torch.no_grad():
for batch in dataloader:
src = batch['src_ids'].to(device)
tgt = batch['tgt_ids'].to(device)
tgt_input = tgt[:, :-1]
tgt_output = tgt[:, 1:]
output = model(src, tgt_input)
loss = criterion(output.reshape(-1, output.shape[-1]), tgt_output.reshape(-1))
total_loss += loss.item()
return total_loss / len(dataloader)
if __name__ == "__main__":
# 1️⃣ Download and extract Hindi Aksharantar
url = "https://huggingface.co/datasets/ai4bharat/Aksharantar/resolve/main/hin.zip"
resp = requests.get(url)
resp.raise_for_status()
tmpdir = tempfile.mkdtemp()
with zipfile.ZipFile(io.BytesIO(resp.content), "r") as zip_ref:
zip_ref.extractall(tmpdir)
print("Extracted:", os.listdir(tmpdir))
clean_dir = os.path.join(tmpdir, "cleaned")
os.makedirs(clean_dir, exist_ok=True)
for split in ["train", "valid", "test"]:
clean_json_file(
os.path.join(tmpdir, f"hin_{split}.json"),
os.path.join(clean_dir, f"{split}.json")
)
# 3️⃣ Load cleaned dataset
dataset = load_dataset(
"json",
data_files={
"train": os.path.join(clean_dir, "train.json"),
"validation": os.path.join(clean_dir, "valid.json"),
"test": os.path.join(clean_dir, "test.json"),
},
)
print(dataset)
print(dataset["train"][0])
# Analyze BEFORE sampling
print("\n" + "="*80)
print("DATA ANALYSIS - BEFORE SAMPLING")
print("="*80)
train_stats_before = analyze_dataset_statistics(dataset['train'], 'train (before sampling)')
valid_stats = analyze_dataset_statistics(dataset['validation'], 'validation')
test_stats = analyze_dataset_statistics(dataset['test'], 'test')
# Save statistics
os.makedirs("analysis", exist_ok=True)
train_stats_before.to_csv('analysis/data_stats_train_before_sampling.csv', index=False)
valid_stats.to_csv('analysis/data_stats_valid.csv', index=False)
test_stats.to_csv('analysis/data_stats_test.csv', index=False)
full_dataset = list(dataset['train'])
dataset['train'] = sample_transliteration_dataset(full_dataset, sample_size=100000, top_freq_ratio=0.3)
print(f"\n✅ Sampled dataset size: {len(dataset['train'])}")
print("Example entries:", dataset['train'][:5])
# Analyze AFTER sampling
print("\n" + "="*80)
print("DATA ANALYSIS - AFTER SAMPLING")
print("="*80)
train_stats_after = analyze_dataset_statistics(dataset['train'], 'train (after sampling)')
train_stats_after.to_csv('analysis/data_stats_train_after_sampling.csv', index=False)
# =======================
# 6. TOKENIZERS + DATALOADERS
# =======================
src_tokenizer = CharTokenizer()
tgt_tokenizer = CharTokenizer()
# fit tokenizers on training set
train_items = list(dataset['train'])
valid_items = list(dataset['validation'])
test_items = list(dataset['test'])
# Build tokenizer vocab from train split
src_texts = [item['english word'] for item in train_items]
tgt_texts = [item['native word'] for item in train_items]
src_tokenizer.fit(src_texts)
tgt_tokenizer.fit(tgt_texts)
print(f"\nSource vocab size: {len(src_tokenizer)}")
print(f"Target vocab size: {len(tgt_tokenizer)}")
train_split = train_items
valid_split = valid_items
test_split = test_items
train_dataset = TransliterationDataset(train_split, src_tokenizer, tgt_tokenizer)
valid_dataset = TransliterationDataset(valid_split, src_tokenizer, tgt_tokenizer)
test_dataset = TransliterationDataset(test_split, src_tokenizer, tgt_tokenizer)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=128, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
# =======================
# 7. LSTM MODEL / TRAINING SETUP
# =======================
EMBED_DIM = 256
ENC_HIDDEN_DIM = 256
DEC_HIDDEN_DIM = 256
NUM_LAYERS_MODEL = 2
DROPOUT = 0.3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"\nUsing device: {device}")
encoder = Encoder(len(src_tokenizer), EMBED_DIM, ENC_HIDDEN_DIM, NUM_LAYERS_MODEL, DROPOUT)
decoder = Decoder(len(tgt_tokenizer), EMBED_DIM, ENC_HIDDEN_DIM, DEC_HIDDEN_DIM, NUM_LAYERS_MODEL, DROPOUT)
model = Seq2Seq(encoder, decoder, device).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# =======================
# 8. LSTM TRAINING LOOP
# =======================
print("\n" + "="*80)
print("TRAINING LSTM MODEL")
print("="*80 + "\n")
NUM_EPOCHS = 10
for epoch in range(NUM_EPOCHS):
train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
valid_loss = evaluate(model, valid_loader, criterion, device)
print(f'Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}')
torch.save({
'model_state_dict': model.state_dict(),
'src_vocab': src_tokenizer.char2idx,
'tgt_vocab': tgt_tokenizer.char2idx,
}, 'lstm_transliterator.pt')
# =======================
# 9. LSTM EVALUATION WITH FULL DETAILS
# =======================
print("\n" + "="*80)
print("LSTM MODEL EVALUATION")
print("="*80)
# Greedy decoding
greedy_results_lstm, greedy_metrics_lstm = evaluate_with_full_details(
model, test_loader,
src_tokenizer, tgt_tokenizer,
device=device,
output_file='analysis/lstm_test_results_greedy.jsonl',
decoding='greedy'
)
# Beam search decoding
beam_results_lstm, beam_metrics_lstm = evaluate_with_full_details(
model, test_loader,
src_tokenizer, tgt_tokenizer,
device=device,
output_file='analysis/lstm_test_results_beam.jsonl',
decoding='beam',
beam_width=5
)
test_words = ['namaste', 'dhanyavaad', 'bharat']
print("\nQuick manual checks (LSTM):")
for word in test_words:
print(f"{word} -> {model.translate(word, src_tokenizer, tgt_tokenizer)}")
# =======================
# 11. TRANSFORMER TRAINING
# =======================
print("\n" + "="*80)
print("TRAINING TRANSFORMER MODEL")
print("="*80 + "\n")
D_MODEL = 256
NHEAD = 8
NUM_ENCODER_LAYERS = 2
NUM_DECODER_LAYERS = 2
DIM_FEEDFORWARD = 512
DROPOUT = 0.1
transformer_model = TransformerTransliterator(
src_vocab_size=len(src_tokenizer),
tgt_vocab_size=len(tgt_tokenizer),
d_model=D_MODEL,
nhead=NHEAD,
num_encoder_layers=NUM_ENCODER_LAYERS,
num_decoder_layers=NUM_DECODER_LAYERS,
dim_feedforward=DIM_FEEDFORWARD,
dropout=DROPOUT
).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0, label_smoothing=0.1)
optimizer = torch.optim.Adam(transformer_model.parameters(), lr=1e-4, betas=(0.9, 0.98), eps=1e-9)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)
NUM_EPOCHS = 10
for epoch in range(NUM_EPOCHS):
train_loss = train_transformer_epoch(transformer_model, train_loader, optimizer, criterion, device)
valid_loss = evaluate_transformer(transformer_model, valid_loader, criterion, device)
scheduler.step(valid_loss)
print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}")
torch.save({
'model_state_dict': transformer_model.state_dict(),
'src_vocab': src_tokenizer.char2idx,
'tgt_vocab': tgt_tokenizer.char2idx,
}, 'transformer_transliterator.pt')
# =======================
# 12. TRANSFORMER EVALUATION WITH FULL DETAILS
# =======================
print("\n" + "="*80)
print("TRANSFORMER MODEL EVALUATION")
print("="*80)
# Greedy decoding
greedy_results_transformer, greedy_metrics_transformer = evaluate_with_full_details(
transformer_model, test_loader,
src_tokenizer, tgt_tokenizer,
device=device,
output_file='analysis/transformer_test_results_greedy.jsonl',
decoding='greedy'
)
# Beam search decoding
beam_results_transformer, beam_metrics_transformer = evaluate_with_full_details(
transformer_model, test_loader,
src_tokenizer, tgt_tokenizer,
device=device,
output_file='analysis/transformer_test_results_beam.jsonl',
decoding='beam',
beam_width=5
)
test_words = ['namaste', 'dhanyavaad', 'bharat', 'mumbai', 'hindustan']
print("\nTest Translations (Transformer):")
for word in test_words:
translated = transformer_model.translate(word, src_tokenizer, tgt_tokenizer, device=device)
print(f"{word} -> {translated}")
# =======================
# 13. FINAL SUMMARY
# =======================
print("\n" + "="*80)
print("TRAINING COMPLETE - SUMMARY")
print("="*80 + "\n")
print("LSTM MODEL:")
print(f" Greedy - Word Acc: {greedy_metrics_lstm['word_accuracy']:.4f}, Char F1: {greedy_metrics_lstm['char_f1']:.4f}, Char Acc: {greedy_metrics_lstm['char_accuracy']:.4f}")
print(f" Beam(5) - Word Acc: {beam_metrics_lstm['word_accuracy']:.4f}, Char F1: {beam_metrics_lstm['char_f1']:.4f}, Char Acc: {beam_metrics_lstm['char_accuracy']:.4f}")
print("\nTRANSFORMER MODEL:")
print(f" Greedy - Word Acc: {greedy_metrics_transformer['word_accuracy']:.4f}, Char F1: {greedy_metrics_transformer['char_f1']:.4f}, Char Acc: {greedy_metrics_transformer['char_accuracy']:.4f}")
print(f" Beam(5) - Word Acc: {beam_metrics_transformer['word_accuracy']:.4f}, Char F1: {beam_metrics_transformer['char_f1']:.4f}, Char Acc: {beam_metrics_transformer['char_accuracy']:.4f}")
print("\n📁 OUTPUT FILES:")
print(" Data Statistics:")
print(" - analysis/data_stats_train_before_sampling.csv")
print(" - analysis/data_stats_train_after_sampling.csv")
print(" - analysis/data_stats_valid.csv")
print(" - analysis/data_stats_test.csv")
print(" Model Checkpoints:")
print(" - lstm_transliterator.pt")
print(" - transformer_transliterator.pt")
print(" Test Results (with full metadata):")
print(" - analysis/lstm_test_results_greedy.jsonl")
print(" - analysis/lstm_test_results_beam.jsonl")
print(" - analysis/transformer_test_results_greedy.jsonl")
print(" - analysis/transformer_test_results_beam.jsonl")
print("\n✅ All done! Check the analysis/ directory for detailed results.")
print("="*80)