import json, os, zipfile, io, tempfile, requests from datasets import load_dataset import sys import random from collections import defaultdict, Counter import pandas as pd import numpy as np import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from collections import Counter import numpy as np from datasets import load_dataset from tqdm import tqdm import collections # sys.stdout = open("logs/train.txt", "w", encoding="utf-8") # sys.stderr = sys.stdout # 2️⃣ Clean and standardize JSON files - PRESERVE ALL FIELDS def clean_json_file(infile, outfile): data = [] with open(infile, "r", encoding="utf-8") as f: for line in f: try: obj = json.loads(line.strip()) eng = obj.get("english word") or obj.get("english_word") native = obj.get("native word") or obj.get("native_word") # Strip whitespace and filter out empty strings if eng: eng = eng.strip() if native: native = native.strip() # Only keep if both fields are non-empty after stripping if eng and native: # Preserve all fields score = obj.get("score", None) if score is None: score = float("nan") cleaned_obj = { "english word": eng, "native word": native, "source": obj.get("source", "Unknown"), "score": score, "unique_identifier": obj.get("unique_identifier", None) } data.append(cleaned_obj) except Exception: continue with open(outfile, "w", encoding="utf-8") as f: for entry in data: f.write(json.dumps(entry, ensure_ascii=False) + "\n") print(f"✅ Cleaned {len(data)} entries in {os.path.basename(infile)}") def sample_transliteration_dataset(full_dataset, sample_size=100000, top_freq_ratio=0.3, seed=42): """ Sample a subset of a transliteration dataset. Parameters: - full_dataset: list of dicts, each with 'english word' and 'native word' - sample_size: total number of samples to return - top_freq_ratio: fraction of samples from top frequent words - seed: random seed for reproducibility Returns: - sampled_dataset: list of dicts """ random.seed(seed) # ----------------------------- # 1️⃣ Top frequent words # ----------------------------- words = [item['english word'] for item in full_dataset] freq = Counter(words) sorted_items = sorted(full_dataset, key=lambda x: freq[x['english word']], reverse=True) num_top = int(sample_size * top_freq_ratio) top_items = sorted_items[:num_top] # Remaining items for stratified sampling remaining_size = sample_size - num_top remaining_items = sorted_items[num_top:] # ----------------------------- # 2️⃣ Stratified by word length # ----------------------------- length_groups = defaultdict(list) for item in remaining_items: length_groups[len(item['english word'])].append(item) sampled_remaining = [] total_remaining_items = sum(len(v) for v in length_groups.values()) for length, items in length_groups.items(): n = int(remaining_size * len(items) / total_remaining_items) n = min(n, len(items)) sampled_remaining.extend(random.sample(items, n)) # ----------------------------- # 3️⃣ Combine and shuffle # ----------------------------- sampled_dataset = top_items + sampled_remaining if len(sampled_dataset) > sample_size: sampled_dataset = random.sample(sampled_dataset, sample_size) random.shuffle(sampled_dataset) return sampled_dataset # 4️⃣ DATA ANALYSIS FUNCTION def analyze_dataset_statistics(dataset_split, split_name="train"): """Analyze dataset statistics by source and length""" print(f"\n{'='*70}") print(f"DATASET STATISTICS - {split_name.upper()} SPLIT") print(f"{'='*70}\n") data_list = list(dataset_split) print(f"Total samples: {len(data_list):,}\n") # Group by source source_stats = defaultdict(lambda: { 'count': 0, 'english_lengths': [], 'native_lengths': [], }) for item in data_list: english_word = item.get('english word', '') native_word = item.get('native word', '') source = item.get('source', 'Unknown') source_stats[source]['count'] += 1 source_stats[source]['english_lengths'].append(len(english_word)) source_stats[source]['native_lengths'].append(len(native_word)) # Compute statistics per source stats_list = [] for source, data in sorted(source_stats.items(), key=lambda x: x[1]['count'], reverse=True): eng_lengths = data['english_lengths'] nat_lengths = data['native_lengths'] if eng_lengths and nat_lengths: stats_list.append({ 'Source': source, 'Count': data['count'], 'Percentage': f"{100 * data['count'] / len(data_list):.2f}%", 'Eng_Min': min(eng_lengths), 'Eng_Max': max(eng_lengths), 'Eng_Mean': f"{np.mean(eng_lengths):.2f}", 'Eng_Median': f"{np.median(eng_lengths):.1f}", 'Nat_Min': min(nat_lengths), 'Nat_Max': max(nat_lengths), 'Nat_Mean': f"{np.mean(nat_lengths):.2f}", 'Nat_Median': f"{np.median(nat_lengths):.1f}", }) stats_df = pd.DataFrame(stats_list) print("STATISTICS BY SOURCE:") print(stats_df.to_string(index=False)) print() # Overall length distribution all_eng_lengths = [len(item.get('english word', '')) for item in data_list] all_nat_lengths = [len(item.get('native word', '')) for item in data_list] print("OVERALL LENGTH DISTRIBUTION:") print(f"English - Min: {min(all_eng_lengths)}, Max: {max(all_eng_lengths)}, " f"Mean: {np.mean(all_eng_lengths):.2f}, Median: {np.median(all_eng_lengths):.1f}") print(f"Native - Min: {min(all_nat_lengths)}, Max: {max(all_nat_lengths)}, " f"Mean: {np.mean(all_nat_lengths):.2f}, Median: {np.median(all_nat_lengths):.1f}") print() # Length buckets print("LENGTH DISTRIBUTION (English words):") length_buckets = { '1-3': 0, '4-6': 0, '7-10': 0, '11-15': 0, '16-20': 0, '21+': 0 } for length in all_eng_lengths: if length <= 3: length_buckets['1-3'] += 1 elif length <= 6: length_buckets['4-6'] += 1 elif length <= 10: length_buckets['7-10'] += 1 elif length <= 15: length_buckets['11-15'] += 1 elif length <= 20: length_buckets['16-20'] += 1 else: length_buckets['21+'] += 1 for bucket, count in length_buckets.items(): print(f" {bucket:6s}: {count:6,} ({100*count/len(data_list):5.2f}%)") print(f"\n{'='*70}\n") return stats_df # ======================= # 1. CHARACTER-LEVEL TOKENIZER # ======================= class CharTokenizer: """Character-level tokenizer for transliteration""" def __init__(self, vocab=None): self.pad_token = '' self.sos_token = '' self.eos_token = '' self.unk_token = '' if vocab is None: self.char2idx = { self.pad_token: 0, self.sos_token: 1, self.eos_token: 2, self.unk_token: 3 } self.idx2char = {v: k for k, v in self.char2idx.items()} else: self.char2idx = vocab self.idx2char = {v: k for k, v in self.char2idx.items()} def fit(self, texts): """Build vocabulary from texts""" char_counts = Counter() for text in texts: char_counts.update(text) # Add characters to vocabulary (sorted for consistency) for char, _ in sorted(char_counts.items()): if char not in self.char2idx: self.char2idx[char] = len(self.char2idx) self.idx2char = {v: k for k, v in self.char2idx.items()} return self def encode(self, text, add_special_tokens=True): """Convert text to indices""" if add_special_tokens: indices = [self.char2idx[self.sos_token]] indices.extend([self.char2idx.get(c, self.char2idx[self.unk_token]) for c in text]) indices.append(self.char2idx[self.eos_token]) else: indices = [self.char2idx.get(c, self.char2idx[self.unk_token]) for c in text] return indices def decode(self, indices, skip_special_tokens=True): """Convert indices back to text""" chars = [] for idx in indices: # handle both ints and tensors if isinstance(idx, torch.Tensor): idx = idx.item() char = self.idx2char.get(idx, self.unk_token) if skip_special_tokens and char in [self.pad_token, self.sos_token, self.eos_token]: continue chars.append(char) return ''.join(chars) def __len__(self): return len(self.char2idx) # ======================= # 2. DATASET CLASS # ======================= class TransliterationDataset(Dataset): """Dataset for transliteration task""" def __init__(self, data, src_tokenizer, tgt_tokenizer, max_len=50): # data: an iterable of dict-like objects with 'english word' and 'native word' self.data = data self.src_tokenizer = src_tokenizer self.tgt_tokenizer = tgt_tokenizer self.max_len = max_len def __len__(self): return len(self.data) def __getitem__(self, idx): item = self.data[idx] src = item['english word'] tgt = item['native word'] # Encode src_ids = self.src_tokenizer.encode(src) tgt_ids = self.tgt_tokenizer.encode(tgt) # Truncate if needed src_ids = src_ids[:self.max_len] tgt_ids = tgt_ids[:self.max_len] return { 'src_ids': torch.tensor(src_ids, dtype=torch.long), 'tgt_ids': torch.tensor(tgt_ids, dtype=torch.long), 'src_text': src, 'tgt_text': tgt } def collate_fn(batch): """Custom collate function to pad sequences""" src_ids = [item['src_ids'] for item in batch] tgt_ids = [item['tgt_ids'] for item in batch] # Pad sequences src_ids = nn.utils.rnn.pad_sequence(src_ids, batch_first=True, padding_value=0) tgt_ids = nn.utils.rnn.pad_sequence(tgt_ids, batch_first=True, padding_value=0) return { 'src_ids': src_ids, 'tgt_ids': tgt_ids, 'src_text': [item['src_text'] for item in batch], 'tgt_text': [item['tgt_text'] for item in batch] } # ======================= # 3. LSTM ENCODER-DECODER WITH ATTENTION # ======================= class Encoder(nn.Module): """LSTM Encoder""" def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=2, dropout=0.3): super().__init__() self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=True) self.dropout = nn.Dropout(dropout) def forward(self, x): embedded = self.dropout(self.embedding(x)) outputs, (hidden, cell) = self.lstm(embedded) return outputs, hidden, cell class Attention(nn.Module): """Bahdanau Attention Mechanism""" def __init__(self, dec_hidden_dim, enc_hidden_dim): super().__init__() self.attn = nn.Linear(dec_hidden_dim + enc_hidden_dim*2, dec_hidden_dim) self.v = nn.Linear(dec_hidden_dim, 1, bias=False) def forward(self, hidden, encoder_outputs, mask=None): batch_size = encoder_outputs.shape[0] src_len = encoder_outputs.shape[1] hidden = hidden.unsqueeze(1).repeat(1, src_len, 1) energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2))) attention = self.v(energy).squeeze(2) if mask is not None: attention = attention.masked_fill(mask == 0, -1e10) attn_weights = torch.softmax(attention, dim=1) context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs) return context.squeeze(1), attn_weights.squeeze(1) class Decoder(nn.Module): """LSTM Decoder with Attention""" def __init__(self, vocab_size, embed_dim, enc_hidden_dim, dec_hidden_dim, num_layers=2, dropout=0.3): super().__init__() self.vocab_size = vocab_size self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) self.attention = Attention(dec_hidden_dim, enc_hidden_dim) self.lstm = nn.LSTM(embed_dim + enc_hidden_dim*2, dec_hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) self.fc_out = nn.Linear(dec_hidden_dim + enc_hidden_dim*2 + embed_dim, vocab_size) self.dropout = nn.Dropout(dropout) self.enc_hidden_dim = enc_hidden_dim self.dec_hidden_dim = dec_hidden_dim def forward(self, input, hidden, cell, encoder_outputs, mask=None): input = input.unsqueeze(1) embedded = self.dropout(self.embedding(input)) context, attn_weights = self.attention(hidden[-1], encoder_outputs, mask) context = context.unsqueeze(1) rnn_input = torch.cat((embedded, context), dim=2) output, (hidden, cell) = self.lstm(rnn_input, (hidden, cell)) output = output.squeeze(1) embedded = embedded.squeeze(1) context = context.squeeze(1) prediction = self.fc_out(torch.cat((output, context, embedded), dim=1)) return prediction, hidden, cell, attn_weights class Seq2Seq(nn.Module): """Complete Sequence-to-Sequence Model""" def beam_search_decode(self, src, src_tokenizer, tgt_tokenizer, max_len=50, beam_width=3): """Beam search decoding for Seq2Seq""" self.eval() with torch.no_grad(): if isinstance(src, str): src_ids = src_tokenizer.encode(src) src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(self.device) else: src_tensor = src.to(self.device) encoder_outputs, hidden, cell = self.encoder(src_tensor) num_layers = self.decoder.lstm.num_layers hidden = hidden.view(num_layers, 2, 1, -1) hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2) cell = cell.view(num_layers, 2, 1, -1) cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2) hidden = self.hidden_projection(hidden) cell = self.cell_projection(cell) mask = self.create_mask(src_tensor) start_token = tgt_tokenizer.char2idx[tgt_tokenizer.sos_token] beams = [(torch.tensor([start_token], device=self.device), 0.0, hidden, cell)] completed_sequences = [] for _ in range(max_len): new_beams = [] for seq, log_prob, h, c in beams: input_token = seq[-1].unsqueeze(0) output, h_new, c_new, _ = self.decoder(input_token, h, c, encoder_outputs, mask) probs = torch.log_softmax(output, dim=1).squeeze(0) topk_probs, topk_idx = probs.topk(beam_width) for prob, idx in zip(topk_probs, topk_idx): new_seq = torch.cat([seq, idx.unsqueeze(0)]) new_log_prob = log_prob + prob.item() new_beams.append((new_seq, new_log_prob, h_new, c_new)) new_beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width] beams = [] for seq, log_prob, h, c in new_beams: if seq[-1].item() == tgt_tokenizer.char2idx[tgt_tokenizer.eos_token]: completed_sequences.append((seq, log_prob)) else: beams.append((seq, log_prob, h, c)) if not beams: break if len(completed_sequences) == 0: completed_sequences = beams best_seq = max(completed_sequences, key=lambda x: x[1])[0] return tgt_tokenizer.decode(best_seq[1:], skip_special_tokens=True) def __init__(self, encoder, decoder, device): super().__init__() self.encoder = encoder self.decoder = decoder self.device = device enc_hidden_dim = encoder.lstm.hidden_size dec_hidden_dim = decoder.dec_hidden_dim num_layers = decoder.lstm.num_layers self.hidden_projection = nn.Linear(enc_hidden_dim * 2, dec_hidden_dim) self.cell_projection = nn.Linear(enc_hidden_dim * 2, dec_hidden_dim) def create_mask(self, src): mask = (src != 0) return mask def forward(self, src, tgt, teacher_forcing_ratio=0.5): batch_size = src.shape[0] tgt_len = tgt.shape[1] tgt_vocab_size = self.decoder.vocab_size outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device) encoder_outputs, hidden, cell = self.encoder(src) num_layers = self.decoder.lstm.num_layers hidden = hidden.view(num_layers, 2, batch_size, -1) cell = cell.view(num_layers, 2, batch_size, -1) hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2) cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2) hidden = self.hidden_projection(hidden) cell = self.cell_projection(cell) mask = self.create_mask(src) input = tgt[:, 0] for t in range(1, tgt_len): output, hidden, cell, attn_weights = self.decoder( input, hidden, cell, encoder_outputs, mask ) outputs[:, t] = output teacher_force = torch.rand(1).item() < teacher_forcing_ratio top1 = output.argmax(1) input = tgt[:, t] if teacher_force else top1 return outputs def translate(self, src, src_tokenizer, tgt_tokenizer, max_len=50): """Translate a single source sequence (greedy decoding)""" self.eval() with torch.no_grad(): if isinstance(src, str): src_ids = src_tokenizer.encode(src) src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(self.device) else: src_tensor = src.to(self.device) encoder_outputs, hidden, cell = self.encoder(src_tensor) num_layers = self.decoder.lstm.num_layers hidden = hidden.view(num_layers, 2, 1, -1) hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2) cell = cell.view(num_layers, 2, 1, -1) cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2) hidden = self.hidden_projection(hidden) cell = self.cell_projection(cell) mask = self.create_mask(src_tensor) input = torch.tensor([tgt_tokenizer.char2idx[tgt_tokenizer.sos_token]]).to(self.device) outputs = [] for _ in range(max_len): output, hidden, cell, _ = self.decoder(input, hidden, cell, encoder_outputs, mask) top1 = output.argmax(1) outputs.append(top1.item()) if top1.item() == tgt_tokenizer.char2idx[tgt_tokenizer.eos_token]: break input = top1 return tgt_tokenizer.decode(outputs, skip_special_tokens=True) # ======================= # 4. TRAINING / EVAL HELPERS # ======================= def train_epoch(model, dataloader, optimizer, criterion, device, clip=1.0): model.train() epoch_loss = 0 for batch in dataloader: src = batch['src_ids'].to(device) tgt = batch['tgt_ids'].to(device) optimizer.zero_grad() output = model(src, tgt) output = output[:, 1:].reshape(-1, output.shape[-1]) tgt_flat = tgt[:, 1:].reshape(-1) loss = criterion(output, tgt_flat) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), clip) optimizer.step() epoch_loss += loss.item() return epoch_loss / len(dataloader) def char_overlap_f1(pred, true): """Character-level overlap F1 per word""" pred_counts = collections.Counter(pred) true_counts = collections.Counter(true) overlap = sum((pred_counts & true_counts).values()) if overlap == 0: return 0.0 precision = overlap / len(pred) recall = overlap / len(true) return 2 * precision * recall / (precision + recall) def evaluate(model, dataloader, criterion, device): model.eval() epoch_loss = 0 with torch.no_grad(): for batch in dataloader: src = batch['src_ids'].to(device) tgt = batch['tgt_ids'].to(device) # No teacher forcing output = model(src, tgt, teacher_forcing_ratio=0.0) output = output[:, 1:].reshape(-1, output.shape[-1]) tgt_flat = tgt[:, 1:].reshape(-1) loss = criterion(output, tgt_flat) epoch_loss += loss.item() return epoch_loss / len(dataloader) # ======================= # 5. ENHANCED EVALUATION WITH FULL DETAILS # ======================= def evaluate_with_full_details(model, dataloader, src_tokenizer, tgt_tokenizer, device, output_file='test_results.jsonl', decoding="greedy", beam_width=3, max_samples=None): """ Evaluate model and save detailed results with all metadata """ model.eval() results = [] word_correct = 0 char_correct = 0 char_total = 0 print(f"\n{'='*70}") print(f"DETAILED EVALUATION - {decoding.upper()} DECODING") print(f"{'='*70}\n") with torch.no_grad(): sample_idx = 0 char_f1_list = [] for batch_idx, batch in enumerate(tqdm(dataloader, desc="Evaluating")): src = batch['src_ids'].to(device) src_texts = batch['src_text'] tgt_texts = batch['tgt_text'] for i in range(len(src_texts)): src_text = src_texts[i] tgt_text = tgt_texts[i] src_tensor = src[i].unsqueeze(0) # Generate prediction if hasattr(model, 'beam_search_decode') and decoding == "beam": pred_text = model.beam_search_decode( src_tensor, src_tokenizer, tgt_tokenizer, max_len=100, beam_width=beam_width ) else: pred_text = model.translate( src_tensor, src_tokenizer, tgt_tokenizer, max_len=100 ) pred_text = pred_text.strip() tgt_text = tgt_text.strip() src_text = src_text.strip() # Compute metrics is_correct = (pred_text == tgt_text) if is_correct: word_correct += 1 min_len = min(len(pred_text), len(tgt_text)) char_matches = sum(1 for j in range(min_len) if pred_text[j] == tgt_text[j]) char_correct += char_matches char_total += len(tgt_text) char_accuracy = char_matches / len(tgt_text) if len(tgt_text) > 0 else 0.0 char_f1 = char_overlap_f1(pred_text.strip(), tgt_text.strip()) char_f1_list.append(char_f1) # Get original metadata from dataset # try: original_item = dataloader.dataset.data[sample_idx] source = original_item.get('source', 'Unknown') score = original_item.get('score', None) unique_id = f'sample_{sample_idx}' # Store complete result with all original fields result = { 'unique_identifier': unique_id, 'source': source, 'score': score, 'english_word': src_text, 'native_word': tgt_text, 'predicted_word': pred_text, 'is_correct': is_correct, 'english_length': len(src_text), 'native_length': len(tgt_text), 'predicted_length': len(pred_text), 'char_accuracy': char_accuracy, 'char_f1': char_f1, 'decoding_method': decoding, } if decoding == "beam": result['beam_width'] = beam_width results.append(result) sample_idx += 1 if max_samples and len(results) >= max_samples: break if max_samples and len(results) >= max_samples: break # Create DataFrame results_df = pd.DataFrame(results) # Calculate metrics total_samples = len(results) word_accuracy = word_correct / total_samples if total_samples > 0 else 0.0 char_accuracy = char_correct / char_total if char_total > 0 else 0.0 char_f1 = np.mean(char_f1_list) if len(char_f1_list) > 0 else 0.0 print(f"\nOVERALL METRICS:") print(f" Total Samples: {total_samples:,}") print(f" Word Accuracy: {word_accuracy:.4f}") print(f" Char Accuracy: {char_accuracy:.4f}\n") print(f" Char F1: {char_f1:.4f}\n") # Statistics by source if 'source' in results_df.columns and results_df['source'].nunique() > 1: print("ACCURACY BY SOURCE:") source_stats = results_df.groupby('source').agg({ 'is_correct': ['count', 'sum', 'mean'] }).round(4) source_stats.columns = ['Count', 'Correct', 'Accuracy'] print(source_stats.to_string()) print() # Statistics by length results_df['length_bucket'] = pd.cut( results_df['english_length'], bins=[0, 3, 6, 10, 15, 20, 100], labels=['1-3', '4-6', '7-10', '11-15', '16-20', '21+'] ) print("ACCURACY BY LENGTH:") length_stats = results_df.groupby('length_bucket').agg({ 'is_correct': ['count', 'mean'] }).round(4) length_stats.columns = ['Count', 'Accuracy'] print(length_stats.to_string()) print() # Show some examples print("SAMPLE PREDICTIONS:") for idx in range(min(10, len(results_df))): row = results_df.iloc[idx] ok = "✓" if row['is_correct'] else "✗" print(f"{ok} {row['english_word']} -> {row['predicted_word']} (expected: {row['native_word']})") print() # Save results os.makedirs(os.path.dirname(output_file) if os.path.dirname(output_file) else '.', exist_ok=True) results_df.to_json(output_file, orient='records', lines=True, force_ascii=False) print(f"✅ Results saved to: {output_file}\n") metrics = { 'total_samples': total_samples, 'word_correct': word_correct, 'word_accuracy': word_accuracy, 'char_accuracy': char_accuracy, 'char_f1': char_f1, 'decoding_method': decoding } if decoding == "beam": metrics['beam_width'] = beam_width return results_df, metrics # ======================= # 10. TRANSFORMER MODEL # ======================= import math class PositionalEncoding(nn.Module): """Sinusoidal positional encoding""" def __init__(self, d_model, max_len=5000, dropout=0.1): super().__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:, :x.size(1), :] return self.dropout(x) class TransformerTransliterator(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model=256, nhead=8, num_encoder_layers=2, num_decoder_layers=2, dim_feedforward=512, dropout=0.1, max_len=100): super().__init__() self.d_model = d_model self.src_vocab_size = src_vocab_size self.tgt_vocab_size = tgt_vocab_size # Embeddings self.src_embedding = nn.Embedding(src_vocab_size, d_model, padding_idx=0) self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model, padding_idx=0) # Positional encoding self.pos_encoder = PositionalEncoding(d_model, max_len, dropout) # Transformer self.transformer = nn.Transformer( d_model=d_model, nhead=nhead, num_encoder_layers=num_encoder_layers, num_decoder_layers=num_decoder_layers, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True ) # Output layer self.fc_out = nn.Linear(d_model, tgt_vocab_size) self._init_weights() def _init_weights(self): initrange = 0.1 self.src_embedding.weight.data.uniform_(-initrange, initrange) self.tgt_embedding.weight.data.uniform_(-initrange, initrange) self.fc_out.bias.data.zero_() self.fc_out.weight.data.uniform_(-initrange, initrange) def generate_square_subsequent_mask(self, sz): """Causal mask for decoder""" mask = torch.triu(torch.ones(sz, sz), diagonal=1).bool() mask = mask.masked_fill(mask, float('-inf')) return mask def create_padding_mask(self, seq, pad_idx=0): """Mask for padding tokens""" return (seq == pad_idx) def forward(self, src, tgt): tgt_len = tgt.shape[1] tgt_mask = self.generate_square_subsequent_mask(tgt_len).to(tgt.device) src_padding_mask = self.create_padding_mask(src) tgt_padding_mask = self.create_padding_mask(tgt) src_emb = self.pos_encoder(self.src_embedding(src) * math.sqrt(self.d_model)) tgt_emb = self.pos_encoder(self.tgt_embedding(tgt) * math.sqrt(self.d_model)) output = self.transformer( src_emb, tgt_emb, tgt_mask=tgt_mask, src_key_padding_mask=src_padding_mask, tgt_key_padding_mask=tgt_padding_mask, memory_key_padding_mask=src_padding_mask ) output = self.fc_out(output) return output def translate(self, src, src_tokenizer, tgt_tokenizer, max_len=50, device='cpu', decoding="greedy", beam_width=3): """Greedy or Beam Search decoding""" self.eval() with torch.no_grad(): if isinstance(src, str): src_ids = src_tokenizer.encode(src) src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(device) else: src_tensor = src.to(device) sos_idx = tgt_tokenizer.char2idx[tgt_tokenizer.sos_token] eos_idx = tgt_tokenizer.char2idx[tgt_tokenizer.eos_token] if decoding == "beam": # Initialize beams: list of (sequence, score) beams = [([sos_idx], 0.0)] for _ in range(max_len): new_beams = [] for seq, score in beams: tgt_tensor = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device) output = self.forward(src_tensor, tgt_tensor) logits = output[0, -1, :] probs = torch.log_softmax(logits, dim=-1) topk_probs, topk_indices = probs.topk(beam_width) for k in range(beam_width): next_seq = seq + [topk_indices[k].item()] next_score = score + topk_probs[k].item() new_beams.append((next_seq, next_score)) # Keep top beam_width sequences beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width] # Stop if all beams have ended if all(seq[-1] == eos_idx for seq, _ in beams): break best_seq = beams[0][0] return tgt_tokenizer.decode(best_seq, skip_special_tokens=True) else: # Greedy decoding tgt_indices = [sos_idx] for _ in range(max_len): tgt_tensor = torch.tensor(tgt_indices, dtype=torch.long).unsqueeze(0).to(device) output = self.forward(src_tensor, tgt_tensor) next_token = output[0, -1, :].argmax().item() tgt_indices.append(next_token) if next_token == eos_idx: break return tgt_tokenizer.decode(tgt_indices, skip_special_tokens=True) def train_transformer_epoch(model, dataloader, optimizer, criterion, device): model.train() total_loss = 0 for batch in dataloader: src = batch['src_ids'].to(device) tgt = batch['tgt_ids'].to(device) tgt_input = tgt[:, :-1] tgt_output = tgt[:, 1:] optimizer.zero_grad() output = model(src, tgt_input) loss = criterion(output.reshape(-1, output.shape[-1]), tgt_output.reshape(-1)) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() total_loss += loss.item() return total_loss / len(dataloader) def evaluate_transformer(model, dataloader, criterion, device): model.eval() total_loss = 0 with torch.no_grad(): for batch in dataloader: src = batch['src_ids'].to(device) tgt = batch['tgt_ids'].to(device) tgt_input = tgt[:, :-1] tgt_output = tgt[:, 1:] output = model(src, tgt_input) loss = criterion(output.reshape(-1, output.shape[-1]), tgt_output.reshape(-1)) total_loss += loss.item() return total_loss / len(dataloader) if __name__ == "__main__": # 1️⃣ Download and extract Hindi Aksharantar url = "https://huggingface.co/datasets/ai4bharat/Aksharantar/resolve/main/hin.zip" resp = requests.get(url) resp.raise_for_status() tmpdir = tempfile.mkdtemp() with zipfile.ZipFile(io.BytesIO(resp.content), "r") as zip_ref: zip_ref.extractall(tmpdir) print("Extracted:", os.listdir(tmpdir)) clean_dir = os.path.join(tmpdir, "cleaned") os.makedirs(clean_dir, exist_ok=True) for split in ["train", "valid", "test"]: clean_json_file( os.path.join(tmpdir, f"hin_{split}.json"), os.path.join(clean_dir, f"{split}.json") ) # 3️⃣ Load cleaned dataset dataset = load_dataset( "json", data_files={ "train": os.path.join(clean_dir, "train.json"), "validation": os.path.join(clean_dir, "valid.json"), "test": os.path.join(clean_dir, "test.json"), }, ) print(dataset) print(dataset["train"][0]) # Analyze BEFORE sampling print("\n" + "="*80) print("DATA ANALYSIS - BEFORE SAMPLING") print("="*80) train_stats_before = analyze_dataset_statistics(dataset['train'], 'train (before sampling)') valid_stats = analyze_dataset_statistics(dataset['validation'], 'validation') test_stats = analyze_dataset_statistics(dataset['test'], 'test') # Save statistics os.makedirs("analysis", exist_ok=True) train_stats_before.to_csv('analysis/data_stats_train_before_sampling.csv', index=False) valid_stats.to_csv('analysis/data_stats_valid.csv', index=False) test_stats.to_csv('analysis/data_stats_test.csv', index=False) full_dataset = list(dataset['train']) dataset['train'] = sample_transliteration_dataset(full_dataset, sample_size=100000, top_freq_ratio=0.3) print(f"\n✅ Sampled dataset size: {len(dataset['train'])}") print("Example entries:", dataset['train'][:5]) # Analyze AFTER sampling print("\n" + "="*80) print("DATA ANALYSIS - AFTER SAMPLING") print("="*80) train_stats_after = analyze_dataset_statistics(dataset['train'], 'train (after sampling)') train_stats_after.to_csv('analysis/data_stats_train_after_sampling.csv', index=False) # ======================= # 6. TOKENIZERS + DATALOADERS # ======================= src_tokenizer = CharTokenizer() tgt_tokenizer = CharTokenizer() # fit tokenizers on training set train_items = list(dataset['train']) valid_items = list(dataset['validation']) test_items = list(dataset['test']) # Build tokenizer vocab from train split src_texts = [item['english word'] for item in train_items] tgt_texts = [item['native word'] for item in train_items] src_tokenizer.fit(src_texts) tgt_tokenizer.fit(tgt_texts) print(f"\nSource vocab size: {len(src_tokenizer)}") print(f"Target vocab size: {len(tgt_tokenizer)}") train_split = train_items valid_split = valid_items test_split = test_items train_dataset = TransliterationDataset(train_split, src_tokenizer, tgt_tokenizer) valid_dataset = TransliterationDataset(valid_split, src_tokenizer, tgt_tokenizer) test_dataset = TransliterationDataset(test_split, src_tokenizer, tgt_tokenizer) train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, collate_fn=collate_fn) valid_loader = DataLoader(valid_dataset, batch_size=128, shuffle=False, collate_fn=collate_fn) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn) # ======================= # 7. LSTM MODEL / TRAINING SETUP # ======================= EMBED_DIM = 256 ENC_HIDDEN_DIM = 256 DEC_HIDDEN_DIM = 256 NUM_LAYERS_MODEL = 2 DROPOUT = 0.3 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"\nUsing device: {device}") encoder = Encoder(len(src_tokenizer), EMBED_DIM, ENC_HIDDEN_DIM, NUM_LAYERS_MODEL, DROPOUT) decoder = Decoder(len(tgt_tokenizer), EMBED_DIM, ENC_HIDDEN_DIM, DEC_HIDDEN_DIM, NUM_LAYERS_MODEL, DROPOUT) model = Seq2Seq(encoder, decoder, device).to(device) criterion = nn.CrossEntropyLoss(ignore_index=0) optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # ======================= # 8. LSTM TRAINING LOOP # ======================= print("\n" + "="*80) print("TRAINING LSTM MODEL") print("="*80 + "\n") NUM_EPOCHS = 10 for epoch in range(NUM_EPOCHS): train_loss = train_epoch(model, train_loader, optimizer, criterion, device) valid_loss = evaluate(model, valid_loader, criterion, device) print(f'Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}') torch.save({ 'model_state_dict': model.state_dict(), 'src_vocab': src_tokenizer.char2idx, 'tgt_vocab': tgt_tokenizer.char2idx, }, 'lstm_transliterator.pt') # ======================= # 9. LSTM EVALUATION WITH FULL DETAILS # ======================= print("\n" + "="*80) print("LSTM MODEL EVALUATION") print("="*80) # Greedy decoding greedy_results_lstm, greedy_metrics_lstm = evaluate_with_full_details( model, test_loader, src_tokenizer, tgt_tokenizer, device=device, output_file='analysis/lstm_test_results_greedy.jsonl', decoding='greedy' ) # Beam search decoding beam_results_lstm, beam_metrics_lstm = evaluate_with_full_details( model, test_loader, src_tokenizer, tgt_tokenizer, device=device, output_file='analysis/lstm_test_results_beam.jsonl', decoding='beam', beam_width=5 ) test_words = ['namaste', 'dhanyavaad', 'bharat'] print("\nQuick manual checks (LSTM):") for word in test_words: print(f"{word} -> {model.translate(word, src_tokenizer, tgt_tokenizer)}") # ======================= # 11. TRANSFORMER TRAINING # ======================= print("\n" + "="*80) print("TRAINING TRANSFORMER MODEL") print("="*80 + "\n") D_MODEL = 256 NHEAD = 8 NUM_ENCODER_LAYERS = 2 NUM_DECODER_LAYERS = 2 DIM_FEEDFORWARD = 512 DROPOUT = 0.1 transformer_model = TransformerTransliterator( src_vocab_size=len(src_tokenizer), tgt_vocab_size=len(tgt_tokenizer), d_model=D_MODEL, nhead=NHEAD, num_encoder_layers=NUM_ENCODER_LAYERS, num_decoder_layers=NUM_DECODER_LAYERS, dim_feedforward=DIM_FEEDFORWARD, dropout=DROPOUT ).to(device) criterion = nn.CrossEntropyLoss(ignore_index=0, label_smoothing=0.1) optimizer = torch.optim.Adam(transformer_model.parameters(), lr=1e-4, betas=(0.9, 0.98), eps=1e-9) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2) NUM_EPOCHS = 10 for epoch in range(NUM_EPOCHS): train_loss = train_transformer_epoch(transformer_model, train_loader, optimizer, criterion, device) valid_loss = evaluate_transformer(transformer_model, valid_loader, criterion, device) scheduler.step(valid_loss) print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}") torch.save({ 'model_state_dict': transformer_model.state_dict(), 'src_vocab': src_tokenizer.char2idx, 'tgt_vocab': tgt_tokenizer.char2idx, }, 'transformer_transliterator.pt') # ======================= # 12. TRANSFORMER EVALUATION WITH FULL DETAILS # ======================= print("\n" + "="*80) print("TRANSFORMER MODEL EVALUATION") print("="*80) # Greedy decoding greedy_results_transformer, greedy_metrics_transformer = evaluate_with_full_details( transformer_model, test_loader, src_tokenizer, tgt_tokenizer, device=device, output_file='analysis/transformer_test_results_greedy.jsonl', decoding='greedy' ) # Beam search decoding beam_results_transformer, beam_metrics_transformer = evaluate_with_full_details( transformer_model, test_loader, src_tokenizer, tgt_tokenizer, device=device, output_file='analysis/transformer_test_results_beam.jsonl', decoding='beam', beam_width=5 ) test_words = ['namaste', 'dhanyavaad', 'bharat', 'mumbai', 'hindustan'] print("\nTest Translations (Transformer):") for word in test_words: translated = transformer_model.translate(word, src_tokenizer, tgt_tokenizer, device=device) print(f"{word} -> {translated}") # ======================= # 13. FINAL SUMMARY # ======================= print("\n" + "="*80) print("TRAINING COMPLETE - SUMMARY") print("="*80 + "\n") print("LSTM MODEL:") print(f" Greedy - Word Acc: {greedy_metrics_lstm['word_accuracy']:.4f}, Char F1: {greedy_metrics_lstm['char_f1']:.4f}, Char Acc: {greedy_metrics_lstm['char_accuracy']:.4f}") print(f" Beam(5) - Word Acc: {beam_metrics_lstm['word_accuracy']:.4f}, Char F1: {beam_metrics_lstm['char_f1']:.4f}, Char Acc: {beam_metrics_lstm['char_accuracy']:.4f}") print("\nTRANSFORMER MODEL:") print(f" Greedy - Word Acc: {greedy_metrics_transformer['word_accuracy']:.4f}, Char F1: {greedy_metrics_transformer['char_f1']:.4f}, Char Acc: {greedy_metrics_transformer['char_accuracy']:.4f}") print(f" Beam(5) - Word Acc: {beam_metrics_transformer['word_accuracy']:.4f}, Char F1: {beam_metrics_transformer['char_f1']:.4f}, Char Acc: {beam_metrics_transformer['char_accuracy']:.4f}") print("\n📁 OUTPUT FILES:") print(" Data Statistics:") print(" - analysis/data_stats_train_before_sampling.csv") print(" - analysis/data_stats_train_after_sampling.csv") print(" - analysis/data_stats_valid.csv") print(" - analysis/data_stats_test.csv") print(" Model Checkpoints:") print(" - lstm_transliterator.pt") print(" - transformer_transliterator.pt") print(" Test Results (with full metadata):") print(" - analysis/lstm_test_results_greedy.jsonl") print(" - analysis/lstm_test_results_beam.jsonl") print(" - analysis/transformer_test_results_greedy.jsonl") print(" - analysis/transformer_test_results_beam.jsonl") print("\n✅ All done! Check the analysis/ directory for detailed results.") print("="*80)