Spaces:
Sleeping
Sleeping
| import json, os, zipfile, io, tempfile, requests | |
| from datasets import load_dataset | |
| import sys | |
| import random | |
| from collections import defaultdict, Counter | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from torch.utils.data import Dataset, DataLoader | |
| from collections import Counter | |
| import numpy as np | |
| from datasets import load_dataset | |
| from tqdm import tqdm | |
| import collections | |
| # sys.stdout = open("logs/train.txt", "w", encoding="utf-8") | |
| # sys.stderr = sys.stdout | |
| # 2️⃣ Clean and standardize JSON files - PRESERVE ALL FIELDS | |
| def clean_json_file(infile, outfile): | |
| data = [] | |
| with open(infile, "r", encoding="utf-8") as f: | |
| for line in f: | |
| try: | |
| obj = json.loads(line.strip()) | |
| eng = obj.get("english word") or obj.get("english_word") | |
| native = obj.get("native word") or obj.get("native_word") | |
| # Strip whitespace and filter out empty strings | |
| if eng: | |
| eng = eng.strip() | |
| if native: | |
| native = native.strip() | |
| # Only keep if both fields are non-empty after stripping | |
| if eng and native: | |
| # Preserve all fields | |
| score = obj.get("score", None) | |
| if score is None: | |
| score = float("nan") | |
| cleaned_obj = { | |
| "english word": eng, | |
| "native word": native, | |
| "source": obj.get("source", "Unknown"), | |
| "score": score, | |
| "unique_identifier": obj.get("unique_identifier", None) | |
| } | |
| data.append(cleaned_obj) | |
| except Exception: | |
| continue | |
| with open(outfile, "w", encoding="utf-8") as f: | |
| for entry in data: | |
| f.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
| print(f"✅ Cleaned {len(data)} entries in {os.path.basename(infile)}") | |
| def sample_transliteration_dataset(full_dataset, sample_size=100000, top_freq_ratio=0.3, seed=42): | |
| """ | |
| Sample a subset of a transliteration dataset. | |
| Parameters: | |
| - full_dataset: list of dicts, each with 'english word' and 'native word' | |
| - sample_size: total number of samples to return | |
| - top_freq_ratio: fraction of samples from top frequent words | |
| - seed: random seed for reproducibility | |
| Returns: | |
| - sampled_dataset: list of dicts | |
| """ | |
| random.seed(seed) | |
| # ----------------------------- | |
| # 1️⃣ Top frequent words | |
| # ----------------------------- | |
| words = [item['english word'] for item in full_dataset] | |
| freq = Counter(words) | |
| sorted_items = sorted(full_dataset, key=lambda x: freq[x['english word']], reverse=True) | |
| num_top = int(sample_size * top_freq_ratio) | |
| top_items = sorted_items[:num_top] | |
| # Remaining items for stratified sampling | |
| remaining_size = sample_size - num_top | |
| remaining_items = sorted_items[num_top:] | |
| # ----------------------------- | |
| # 2️⃣ Stratified by word length | |
| # ----------------------------- | |
| length_groups = defaultdict(list) | |
| for item in remaining_items: | |
| length_groups[len(item['english word'])].append(item) | |
| sampled_remaining = [] | |
| total_remaining_items = sum(len(v) for v in length_groups.values()) | |
| for length, items in length_groups.items(): | |
| n = int(remaining_size * len(items) / total_remaining_items) | |
| n = min(n, len(items)) | |
| sampled_remaining.extend(random.sample(items, n)) | |
| # ----------------------------- | |
| # 3️⃣ Combine and shuffle | |
| # ----------------------------- | |
| sampled_dataset = top_items + sampled_remaining | |
| if len(sampled_dataset) > sample_size: | |
| sampled_dataset = random.sample(sampled_dataset, sample_size) | |
| random.shuffle(sampled_dataset) | |
| return sampled_dataset | |
| # 4️⃣ DATA ANALYSIS FUNCTION | |
| def analyze_dataset_statistics(dataset_split, split_name="train"): | |
| """Analyze dataset statistics by source and length""" | |
| print(f"\n{'='*70}") | |
| print(f"DATASET STATISTICS - {split_name.upper()} SPLIT") | |
| print(f"{'='*70}\n") | |
| data_list = list(dataset_split) | |
| print(f"Total samples: {len(data_list):,}\n") | |
| # Group by source | |
| source_stats = defaultdict(lambda: { | |
| 'count': 0, | |
| 'english_lengths': [], | |
| 'native_lengths': [], | |
| }) | |
| for item in data_list: | |
| english_word = item.get('english word', '') | |
| native_word = item.get('native word', '') | |
| source = item.get('source', 'Unknown') | |
| source_stats[source]['count'] += 1 | |
| source_stats[source]['english_lengths'].append(len(english_word)) | |
| source_stats[source]['native_lengths'].append(len(native_word)) | |
| # Compute statistics per source | |
| stats_list = [] | |
| for source, data in sorted(source_stats.items(), key=lambda x: x[1]['count'], reverse=True): | |
| eng_lengths = data['english_lengths'] | |
| nat_lengths = data['native_lengths'] | |
| if eng_lengths and nat_lengths: | |
| stats_list.append({ | |
| 'Source': source, | |
| 'Count': data['count'], | |
| 'Percentage': f"{100 * data['count'] / len(data_list):.2f}%", | |
| 'Eng_Min': min(eng_lengths), | |
| 'Eng_Max': max(eng_lengths), | |
| 'Eng_Mean': f"{np.mean(eng_lengths):.2f}", | |
| 'Eng_Median': f"{np.median(eng_lengths):.1f}", | |
| 'Nat_Min': min(nat_lengths), | |
| 'Nat_Max': max(nat_lengths), | |
| 'Nat_Mean': f"{np.mean(nat_lengths):.2f}", | |
| 'Nat_Median': f"{np.median(nat_lengths):.1f}", | |
| }) | |
| stats_df = pd.DataFrame(stats_list) | |
| print("STATISTICS BY SOURCE:") | |
| print(stats_df.to_string(index=False)) | |
| print() | |
| # Overall length distribution | |
| all_eng_lengths = [len(item.get('english word', '')) for item in data_list] | |
| all_nat_lengths = [len(item.get('native word', '')) for item in data_list] | |
| print("OVERALL LENGTH DISTRIBUTION:") | |
| print(f"English - Min: {min(all_eng_lengths)}, Max: {max(all_eng_lengths)}, " | |
| f"Mean: {np.mean(all_eng_lengths):.2f}, Median: {np.median(all_eng_lengths):.1f}") | |
| print(f"Native - Min: {min(all_nat_lengths)}, Max: {max(all_nat_lengths)}, " | |
| f"Mean: {np.mean(all_nat_lengths):.2f}, Median: {np.median(all_nat_lengths):.1f}") | |
| print() | |
| # Length buckets | |
| print("LENGTH DISTRIBUTION (English words):") | |
| length_buckets = { | |
| '1-3': 0, '4-6': 0, '7-10': 0, '11-15': 0, '16-20': 0, '21+': 0 | |
| } | |
| for length in all_eng_lengths: | |
| if length <= 3: | |
| length_buckets['1-3'] += 1 | |
| elif length <= 6: | |
| length_buckets['4-6'] += 1 | |
| elif length <= 10: | |
| length_buckets['7-10'] += 1 | |
| elif length <= 15: | |
| length_buckets['11-15'] += 1 | |
| elif length <= 20: | |
| length_buckets['16-20'] += 1 | |
| else: | |
| length_buckets['21+'] += 1 | |
| for bucket, count in length_buckets.items(): | |
| print(f" {bucket:6s}: {count:6,} ({100*count/len(data_list):5.2f}%)") | |
| print(f"\n{'='*70}\n") | |
| return stats_df | |
| # ======================= | |
| # 1. CHARACTER-LEVEL TOKENIZER | |
| # ======================= | |
| class CharTokenizer: | |
| """Character-level tokenizer for transliteration""" | |
| def __init__(self, vocab=None): | |
| self.pad_token = '<PAD>' | |
| self.sos_token = '<SOS>' | |
| self.eos_token = '<EOS>' | |
| self.unk_token = '<UNK>' | |
| if vocab is None: | |
| self.char2idx = { | |
| self.pad_token: 0, | |
| self.sos_token: 1, | |
| self.eos_token: 2, | |
| self.unk_token: 3 | |
| } | |
| self.idx2char = {v: k for k, v in self.char2idx.items()} | |
| else: | |
| self.char2idx = vocab | |
| self.idx2char = {v: k for k, v in self.char2idx.items()} | |
| def fit(self, texts): | |
| """Build vocabulary from texts""" | |
| char_counts = Counter() | |
| for text in texts: | |
| char_counts.update(text) | |
| # Add characters to vocabulary (sorted for consistency) | |
| for char, _ in sorted(char_counts.items()): | |
| if char not in self.char2idx: | |
| self.char2idx[char] = len(self.char2idx) | |
| self.idx2char = {v: k for k, v in self.char2idx.items()} | |
| return self | |
| def encode(self, text, add_special_tokens=True): | |
| """Convert text to indices""" | |
| if add_special_tokens: | |
| indices = [self.char2idx[self.sos_token]] | |
| indices.extend([self.char2idx.get(c, self.char2idx[self.unk_token]) for c in text]) | |
| indices.append(self.char2idx[self.eos_token]) | |
| else: | |
| indices = [self.char2idx.get(c, self.char2idx[self.unk_token]) for c in text] | |
| return indices | |
| def decode(self, indices, skip_special_tokens=True): | |
| """Convert indices back to text""" | |
| chars = [] | |
| for idx in indices: | |
| # handle both ints and tensors | |
| if isinstance(idx, torch.Tensor): | |
| idx = idx.item() | |
| char = self.idx2char.get(idx, self.unk_token) | |
| if skip_special_tokens and char in [self.pad_token, self.sos_token, self.eos_token]: | |
| continue | |
| chars.append(char) | |
| return ''.join(chars) | |
| def __len__(self): | |
| return len(self.char2idx) | |
| # ======================= | |
| # 2. DATASET CLASS | |
| # ======================= | |
| class TransliterationDataset(Dataset): | |
| """Dataset for transliteration task""" | |
| def __init__(self, data, src_tokenizer, tgt_tokenizer, max_len=50): | |
| # data: an iterable of dict-like objects with 'english word' and 'native word' | |
| self.data = data | |
| self.src_tokenizer = src_tokenizer | |
| self.tgt_tokenizer = tgt_tokenizer | |
| self.max_len = max_len | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| item = self.data[idx] | |
| src = item['english word'] | |
| tgt = item['native word'] | |
| # Encode | |
| src_ids = self.src_tokenizer.encode(src) | |
| tgt_ids = self.tgt_tokenizer.encode(tgt) | |
| # Truncate if needed | |
| src_ids = src_ids[:self.max_len] | |
| tgt_ids = tgt_ids[:self.max_len] | |
| return { | |
| 'src_ids': torch.tensor(src_ids, dtype=torch.long), | |
| 'tgt_ids': torch.tensor(tgt_ids, dtype=torch.long), | |
| 'src_text': src, | |
| 'tgt_text': tgt | |
| } | |
| def collate_fn(batch): | |
| """Custom collate function to pad sequences""" | |
| src_ids = [item['src_ids'] for item in batch] | |
| tgt_ids = [item['tgt_ids'] for item in batch] | |
| # Pad sequences | |
| src_ids = nn.utils.rnn.pad_sequence(src_ids, batch_first=True, padding_value=0) | |
| tgt_ids = nn.utils.rnn.pad_sequence(tgt_ids, batch_first=True, padding_value=0) | |
| return { | |
| 'src_ids': src_ids, | |
| 'tgt_ids': tgt_ids, | |
| 'src_text': [item['src_text'] for item in batch], | |
| 'tgt_text': [item['tgt_text'] for item in batch] | |
| } | |
| # ======================= | |
| # 3. LSTM ENCODER-DECODER WITH ATTENTION | |
| # ======================= | |
| class Encoder(nn.Module): | |
| """LSTM Encoder""" | |
| def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=2, dropout=0.3): | |
| super().__init__() | |
| self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) | |
| self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, | |
| batch_first=True, dropout=dropout if num_layers > 1 else 0, | |
| bidirectional=True) | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, x): | |
| embedded = self.dropout(self.embedding(x)) | |
| outputs, (hidden, cell) = self.lstm(embedded) | |
| return outputs, hidden, cell | |
| class Attention(nn.Module): | |
| """Bahdanau Attention Mechanism""" | |
| def __init__(self, dec_hidden_dim, enc_hidden_dim): | |
| super().__init__() | |
| self.attn = nn.Linear(dec_hidden_dim + enc_hidden_dim*2, dec_hidden_dim) | |
| self.v = nn.Linear(dec_hidden_dim, 1, bias=False) | |
| def forward(self, hidden, encoder_outputs, mask=None): | |
| batch_size = encoder_outputs.shape[0] | |
| src_len = encoder_outputs.shape[1] | |
| hidden = hidden.unsqueeze(1).repeat(1, src_len, 1) | |
| energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2))) | |
| attention = self.v(energy).squeeze(2) | |
| if mask is not None: | |
| attention = attention.masked_fill(mask == 0, -1e10) | |
| attn_weights = torch.softmax(attention, dim=1) | |
| context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs) | |
| return context.squeeze(1), attn_weights.squeeze(1) | |
| class Decoder(nn.Module): | |
| """LSTM Decoder with Attention""" | |
| def __init__(self, vocab_size, embed_dim, enc_hidden_dim, dec_hidden_dim, num_layers=2, dropout=0.3): | |
| super().__init__() | |
| self.vocab_size = vocab_size | |
| self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) | |
| self.attention = Attention(dec_hidden_dim, enc_hidden_dim) | |
| self.lstm = nn.LSTM(embed_dim + enc_hidden_dim*2, dec_hidden_dim, num_layers, | |
| batch_first=True, dropout=dropout if num_layers > 1 else 0) | |
| self.fc_out = nn.Linear(dec_hidden_dim + enc_hidden_dim*2 + embed_dim, vocab_size) | |
| self.dropout = nn.Dropout(dropout) | |
| self.enc_hidden_dim = enc_hidden_dim | |
| self.dec_hidden_dim = dec_hidden_dim | |
| def forward(self, input, hidden, cell, encoder_outputs, mask=None): | |
| input = input.unsqueeze(1) | |
| embedded = self.dropout(self.embedding(input)) | |
| context, attn_weights = self.attention(hidden[-1], encoder_outputs, mask) | |
| context = context.unsqueeze(1) | |
| rnn_input = torch.cat((embedded, context), dim=2) | |
| output, (hidden, cell) = self.lstm(rnn_input, (hidden, cell)) | |
| output = output.squeeze(1) | |
| embedded = embedded.squeeze(1) | |
| context = context.squeeze(1) | |
| prediction = self.fc_out(torch.cat((output, context, embedded), dim=1)) | |
| return prediction, hidden, cell, attn_weights | |
| class Seq2Seq(nn.Module): | |
| """Complete Sequence-to-Sequence Model""" | |
| def beam_search_decode(self, src, src_tokenizer, tgt_tokenizer, max_len=50, beam_width=3): | |
| """Beam search decoding for Seq2Seq""" | |
| self.eval() | |
| with torch.no_grad(): | |
| if isinstance(src, str): | |
| src_ids = src_tokenizer.encode(src) | |
| src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(self.device) | |
| else: | |
| src_tensor = src.to(self.device) | |
| encoder_outputs, hidden, cell = self.encoder(src_tensor) | |
| num_layers = self.decoder.lstm.num_layers | |
| hidden = hidden.view(num_layers, 2, 1, -1) | |
| hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2) | |
| cell = cell.view(num_layers, 2, 1, -1) | |
| cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2) | |
| hidden = self.hidden_projection(hidden) | |
| cell = self.cell_projection(cell) | |
| mask = self.create_mask(src_tensor) | |
| start_token = tgt_tokenizer.char2idx[tgt_tokenizer.sos_token] | |
| beams = [(torch.tensor([start_token], device=self.device), 0.0, hidden, cell)] | |
| completed_sequences = [] | |
| for _ in range(max_len): | |
| new_beams = [] | |
| for seq, log_prob, h, c in beams: | |
| input_token = seq[-1].unsqueeze(0) | |
| output, h_new, c_new, _ = self.decoder(input_token, h, c, encoder_outputs, mask) | |
| probs = torch.log_softmax(output, dim=1).squeeze(0) | |
| topk_probs, topk_idx = probs.topk(beam_width) | |
| for prob, idx in zip(topk_probs, topk_idx): | |
| new_seq = torch.cat([seq, idx.unsqueeze(0)]) | |
| new_log_prob = log_prob + prob.item() | |
| new_beams.append((new_seq, new_log_prob, h_new, c_new)) | |
| new_beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width] | |
| beams = [] | |
| for seq, log_prob, h, c in new_beams: | |
| if seq[-1].item() == tgt_tokenizer.char2idx[tgt_tokenizer.eos_token]: | |
| completed_sequences.append((seq, log_prob)) | |
| else: | |
| beams.append((seq, log_prob, h, c)) | |
| if not beams: | |
| break | |
| if len(completed_sequences) == 0: | |
| completed_sequences = beams | |
| best_seq = max(completed_sequences, key=lambda x: x[1])[0] | |
| return tgt_tokenizer.decode(best_seq[1:], skip_special_tokens=True) | |
| def __init__(self, encoder, decoder, device): | |
| super().__init__() | |
| self.encoder = encoder | |
| self.decoder = decoder | |
| self.device = device | |
| enc_hidden_dim = encoder.lstm.hidden_size | |
| dec_hidden_dim = decoder.dec_hidden_dim | |
| num_layers = decoder.lstm.num_layers | |
| self.hidden_projection = nn.Linear(enc_hidden_dim * 2, dec_hidden_dim) | |
| self.cell_projection = nn.Linear(enc_hidden_dim * 2, dec_hidden_dim) | |
| def create_mask(self, src): | |
| mask = (src != 0) | |
| return mask | |
| def forward(self, src, tgt, teacher_forcing_ratio=0.5): | |
| batch_size = src.shape[0] | |
| tgt_len = tgt.shape[1] | |
| tgt_vocab_size = self.decoder.vocab_size | |
| outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device) | |
| encoder_outputs, hidden, cell = self.encoder(src) | |
| num_layers = self.decoder.lstm.num_layers | |
| hidden = hidden.view(num_layers, 2, batch_size, -1) | |
| cell = cell.view(num_layers, 2, batch_size, -1) | |
| hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2) | |
| cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2) | |
| hidden = self.hidden_projection(hidden) | |
| cell = self.cell_projection(cell) | |
| mask = self.create_mask(src) | |
| input = tgt[:, 0] | |
| for t in range(1, tgt_len): | |
| output, hidden, cell, attn_weights = self.decoder( | |
| input, hidden, cell, encoder_outputs, mask | |
| ) | |
| outputs[:, t] = output | |
| teacher_force = torch.rand(1).item() < teacher_forcing_ratio | |
| top1 = output.argmax(1) | |
| input = tgt[:, t] if teacher_force else top1 | |
| return outputs | |
| def translate(self, src, src_tokenizer, tgt_tokenizer, max_len=50): | |
| """Translate a single source sequence (greedy decoding)""" | |
| self.eval() | |
| with torch.no_grad(): | |
| if isinstance(src, str): | |
| src_ids = src_tokenizer.encode(src) | |
| src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(self.device) | |
| else: | |
| src_tensor = src.to(self.device) | |
| encoder_outputs, hidden, cell = self.encoder(src_tensor) | |
| num_layers = self.decoder.lstm.num_layers | |
| hidden = hidden.view(num_layers, 2, 1, -1) | |
| hidden = torch.cat((hidden[:, 0], hidden[:, 1]), dim=2) | |
| cell = cell.view(num_layers, 2, 1, -1) | |
| cell = torch.cat((cell[:, 0], cell[:, 1]), dim=2) | |
| hidden = self.hidden_projection(hidden) | |
| cell = self.cell_projection(cell) | |
| mask = self.create_mask(src_tensor) | |
| input = torch.tensor([tgt_tokenizer.char2idx[tgt_tokenizer.sos_token]]).to(self.device) | |
| outputs = [] | |
| for _ in range(max_len): | |
| output, hidden, cell, _ = self.decoder(input, hidden, cell, encoder_outputs, mask) | |
| top1 = output.argmax(1) | |
| outputs.append(top1.item()) | |
| if top1.item() == tgt_tokenizer.char2idx[tgt_tokenizer.eos_token]: | |
| break | |
| input = top1 | |
| return tgt_tokenizer.decode(outputs, skip_special_tokens=True) | |
| # ======================= | |
| # 4. TRAINING / EVAL HELPERS | |
| # ======================= | |
| def train_epoch(model, dataloader, optimizer, criterion, device, clip=1.0): | |
| model.train() | |
| epoch_loss = 0 | |
| for batch in dataloader: | |
| src = batch['src_ids'].to(device) | |
| tgt = batch['tgt_ids'].to(device) | |
| optimizer.zero_grad() | |
| output = model(src, tgt) | |
| output = output[:, 1:].reshape(-1, output.shape[-1]) | |
| tgt_flat = tgt[:, 1:].reshape(-1) | |
| loss = criterion(output, tgt_flat) | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), clip) | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| return epoch_loss / len(dataloader) | |
| def char_overlap_f1(pred, true): | |
| """Character-level overlap F1 per word""" | |
| pred_counts = collections.Counter(pred) | |
| true_counts = collections.Counter(true) | |
| overlap = sum((pred_counts & true_counts).values()) | |
| if overlap == 0: | |
| return 0.0 | |
| precision = overlap / len(pred) | |
| recall = overlap / len(true) | |
| return 2 * precision * recall / (precision + recall) | |
| def evaluate(model, dataloader, criterion, device): | |
| model.eval() | |
| epoch_loss = 0 | |
| with torch.no_grad(): | |
| for batch in dataloader: | |
| src = batch['src_ids'].to(device) | |
| tgt = batch['tgt_ids'].to(device) | |
| # No teacher forcing | |
| output = model(src, tgt, teacher_forcing_ratio=0.0) | |
| output = output[:, 1:].reshape(-1, output.shape[-1]) | |
| tgt_flat = tgt[:, 1:].reshape(-1) | |
| loss = criterion(output, tgt_flat) | |
| epoch_loss += loss.item() | |
| return epoch_loss / len(dataloader) | |
| # ======================= | |
| # 5. ENHANCED EVALUATION WITH FULL DETAILS | |
| # ======================= | |
| def evaluate_with_full_details(model, dataloader, src_tokenizer, tgt_tokenizer, | |
| device, output_file='test_results.jsonl', | |
| decoding="greedy", beam_width=3, max_samples=None): | |
| """ | |
| Evaluate model and save detailed results with all metadata | |
| """ | |
| model.eval() | |
| results = [] | |
| word_correct = 0 | |
| char_correct = 0 | |
| char_total = 0 | |
| print(f"\n{'='*70}") | |
| print(f"DETAILED EVALUATION - {decoding.upper()} DECODING") | |
| print(f"{'='*70}\n") | |
| with torch.no_grad(): | |
| sample_idx = 0 | |
| char_f1_list = [] | |
| for batch_idx, batch in enumerate(tqdm(dataloader, desc="Evaluating")): | |
| src = batch['src_ids'].to(device) | |
| src_texts = batch['src_text'] | |
| tgt_texts = batch['tgt_text'] | |
| for i in range(len(src_texts)): | |
| src_text = src_texts[i] | |
| tgt_text = tgt_texts[i] | |
| src_tensor = src[i].unsqueeze(0) | |
| # Generate prediction | |
| if hasattr(model, 'beam_search_decode') and decoding == "beam": | |
| pred_text = model.beam_search_decode( | |
| src_tensor, src_tokenizer, tgt_tokenizer, | |
| max_len=100, beam_width=beam_width | |
| ) | |
| else: | |
| pred_text = model.translate( | |
| src_tensor, src_tokenizer, tgt_tokenizer, | |
| max_len=100 | |
| ) | |
| pred_text = pred_text.strip() | |
| tgt_text = tgt_text.strip() | |
| src_text = src_text.strip() | |
| # Compute metrics | |
| is_correct = (pred_text == tgt_text) | |
| if is_correct: | |
| word_correct += 1 | |
| min_len = min(len(pred_text), len(tgt_text)) | |
| char_matches = sum(1 for j in range(min_len) if pred_text[j] == tgt_text[j]) | |
| char_correct += char_matches | |
| char_total += len(tgt_text) | |
| char_accuracy = char_matches / len(tgt_text) if len(tgt_text) > 0 else 0.0 | |
| char_f1 = char_overlap_f1(pred_text.strip(), tgt_text.strip()) | |
| char_f1_list.append(char_f1) | |
| # Get original metadata from dataset | |
| # try: | |
| original_item = dataloader.dataset.data[sample_idx] | |
| source = original_item.get('source', 'Unknown') | |
| score = original_item.get('score', None) | |
| unique_id = f'sample_{sample_idx}' | |
| # Store complete result with all original fields | |
| result = { | |
| 'unique_identifier': unique_id, | |
| 'source': source, | |
| 'score': score, | |
| 'english_word': src_text, | |
| 'native_word': tgt_text, | |
| 'predicted_word': pred_text, | |
| 'is_correct': is_correct, | |
| 'english_length': len(src_text), | |
| 'native_length': len(tgt_text), | |
| 'predicted_length': len(pred_text), | |
| 'char_accuracy': char_accuracy, | |
| 'char_f1': char_f1, | |
| 'decoding_method': decoding, | |
| } | |
| if decoding == "beam": | |
| result['beam_width'] = beam_width | |
| results.append(result) | |
| sample_idx += 1 | |
| if max_samples and len(results) >= max_samples: | |
| break | |
| if max_samples and len(results) >= max_samples: | |
| break | |
| # Create DataFrame | |
| results_df = pd.DataFrame(results) | |
| # Calculate metrics | |
| total_samples = len(results) | |
| word_accuracy = word_correct / total_samples if total_samples > 0 else 0.0 | |
| char_accuracy = char_correct / char_total if char_total > 0 else 0.0 | |
| char_f1 = np.mean(char_f1_list) if len(char_f1_list) > 0 else 0.0 | |
| print(f"\nOVERALL METRICS:") | |
| print(f" Total Samples: {total_samples:,}") | |
| print(f" Word Accuracy: {word_accuracy:.4f}") | |
| print(f" Char Accuracy: {char_accuracy:.4f}\n") | |
| print(f" Char F1: {char_f1:.4f}\n") | |
| # Statistics by source | |
| if 'source' in results_df.columns and results_df['source'].nunique() > 1: | |
| print("ACCURACY BY SOURCE:") | |
| source_stats = results_df.groupby('source').agg({ | |
| 'is_correct': ['count', 'sum', 'mean'] | |
| }).round(4) | |
| source_stats.columns = ['Count', 'Correct', 'Accuracy'] | |
| print(source_stats.to_string()) | |
| print() | |
| # Statistics by length | |
| results_df['length_bucket'] = pd.cut( | |
| results_df['english_length'], | |
| bins=[0, 3, 6, 10, 15, 20, 100], | |
| labels=['1-3', '4-6', '7-10', '11-15', '16-20', '21+'] | |
| ) | |
| print("ACCURACY BY LENGTH:") | |
| length_stats = results_df.groupby('length_bucket').agg({ | |
| 'is_correct': ['count', 'mean'] | |
| }).round(4) | |
| length_stats.columns = ['Count', 'Accuracy'] | |
| print(length_stats.to_string()) | |
| print() | |
| # Show some examples | |
| print("SAMPLE PREDICTIONS:") | |
| for idx in range(min(10, len(results_df))): | |
| row = results_df.iloc[idx] | |
| ok = "✓" if row['is_correct'] else "✗" | |
| print(f"{ok} {row['english_word']} -> {row['predicted_word']} (expected: {row['native_word']})") | |
| print() | |
| # Save results | |
| os.makedirs(os.path.dirname(output_file) if os.path.dirname(output_file) else '.', exist_ok=True) | |
| results_df.to_json(output_file, orient='records', lines=True, force_ascii=False) | |
| print(f"✅ Results saved to: {output_file}\n") | |
| metrics = { | |
| 'total_samples': total_samples, | |
| 'word_correct': word_correct, | |
| 'word_accuracy': word_accuracy, | |
| 'char_accuracy': char_accuracy, | |
| 'char_f1': char_f1, | |
| 'decoding_method': decoding | |
| } | |
| if decoding == "beam": | |
| metrics['beam_width'] = beam_width | |
| return results_df, metrics | |
| # ======================= | |
| # 10. TRANSFORMER MODEL | |
| # ======================= | |
| import math | |
| class PositionalEncoding(nn.Module): | |
| """Sinusoidal positional encoding""" | |
| def __init__(self, d_model, max_len=5000, dropout=0.1): | |
| super().__init__() | |
| self.dropout = nn.Dropout(p=dropout) | |
| pe = torch.zeros(max_len, d_model) | |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) | |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) | |
| pe[:, 0::2] = torch.sin(position * div_term) | |
| pe[:, 1::2] = torch.cos(position * div_term) | |
| pe = pe.unsqueeze(0) | |
| self.register_buffer('pe', pe) | |
| def forward(self, x): | |
| x = x + self.pe[:, :x.size(1), :] | |
| return self.dropout(x) | |
| class TransformerTransliterator(nn.Module): | |
| def __init__(self, | |
| src_vocab_size, | |
| tgt_vocab_size, | |
| d_model=256, | |
| nhead=8, | |
| num_encoder_layers=2, | |
| num_decoder_layers=2, | |
| dim_feedforward=512, | |
| dropout=0.1, | |
| max_len=100): | |
| super().__init__() | |
| self.d_model = d_model | |
| self.src_vocab_size = src_vocab_size | |
| self.tgt_vocab_size = tgt_vocab_size | |
| # Embeddings | |
| self.src_embedding = nn.Embedding(src_vocab_size, d_model, padding_idx=0) | |
| self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model, padding_idx=0) | |
| # Positional encoding | |
| self.pos_encoder = PositionalEncoding(d_model, max_len, dropout) | |
| # Transformer | |
| self.transformer = nn.Transformer( | |
| d_model=d_model, | |
| nhead=nhead, | |
| num_encoder_layers=num_encoder_layers, | |
| num_decoder_layers=num_decoder_layers, | |
| dim_feedforward=dim_feedforward, | |
| dropout=dropout, | |
| batch_first=True | |
| ) | |
| # Output layer | |
| self.fc_out = nn.Linear(d_model, tgt_vocab_size) | |
| self._init_weights() | |
| def _init_weights(self): | |
| initrange = 0.1 | |
| self.src_embedding.weight.data.uniform_(-initrange, initrange) | |
| self.tgt_embedding.weight.data.uniform_(-initrange, initrange) | |
| self.fc_out.bias.data.zero_() | |
| self.fc_out.weight.data.uniform_(-initrange, initrange) | |
| def generate_square_subsequent_mask(self, sz): | |
| """Causal mask for decoder""" | |
| mask = torch.triu(torch.ones(sz, sz), diagonal=1).bool() | |
| mask = mask.masked_fill(mask, float('-inf')) | |
| return mask | |
| def create_padding_mask(self, seq, pad_idx=0): | |
| """Mask for padding tokens""" | |
| return (seq == pad_idx) | |
| def forward(self, src, tgt): | |
| tgt_len = tgt.shape[1] | |
| tgt_mask = self.generate_square_subsequent_mask(tgt_len).to(tgt.device) | |
| src_padding_mask = self.create_padding_mask(src) | |
| tgt_padding_mask = self.create_padding_mask(tgt) | |
| src_emb = self.pos_encoder(self.src_embedding(src) * math.sqrt(self.d_model)) | |
| tgt_emb = self.pos_encoder(self.tgt_embedding(tgt) * math.sqrt(self.d_model)) | |
| output = self.transformer( | |
| src_emb, | |
| tgt_emb, | |
| tgt_mask=tgt_mask, | |
| src_key_padding_mask=src_padding_mask, | |
| tgt_key_padding_mask=tgt_padding_mask, | |
| memory_key_padding_mask=src_padding_mask | |
| ) | |
| output = self.fc_out(output) | |
| return output | |
| def translate(self, src, src_tokenizer, tgt_tokenizer, max_len=50, device='cpu', | |
| decoding="greedy", beam_width=3): | |
| """Greedy or Beam Search decoding""" | |
| self.eval() | |
| with torch.no_grad(): | |
| if isinstance(src, str): | |
| src_ids = src_tokenizer.encode(src) | |
| src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(device) | |
| else: | |
| src_tensor = src.to(device) | |
| sos_idx = tgt_tokenizer.char2idx[tgt_tokenizer.sos_token] | |
| eos_idx = tgt_tokenizer.char2idx[tgt_tokenizer.eos_token] | |
| if decoding == "beam": | |
| # Initialize beams: list of (sequence, score) | |
| beams = [([sos_idx], 0.0)] | |
| for _ in range(max_len): | |
| new_beams = [] | |
| for seq, score in beams: | |
| tgt_tensor = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device) | |
| output = self.forward(src_tensor, tgt_tensor) | |
| logits = output[0, -1, :] | |
| probs = torch.log_softmax(logits, dim=-1) | |
| topk_probs, topk_indices = probs.topk(beam_width) | |
| for k in range(beam_width): | |
| next_seq = seq + [topk_indices[k].item()] | |
| next_score = score + topk_probs[k].item() | |
| new_beams.append((next_seq, next_score)) | |
| # Keep top beam_width sequences | |
| beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width] | |
| # Stop if all beams have ended | |
| if all(seq[-1] == eos_idx for seq, _ in beams): | |
| break | |
| best_seq = beams[0][0] | |
| return tgt_tokenizer.decode(best_seq, skip_special_tokens=True) | |
| else: | |
| # Greedy decoding | |
| tgt_indices = [sos_idx] | |
| for _ in range(max_len): | |
| tgt_tensor = torch.tensor(tgt_indices, dtype=torch.long).unsqueeze(0).to(device) | |
| output = self.forward(src_tensor, tgt_tensor) | |
| next_token = output[0, -1, :].argmax().item() | |
| tgt_indices.append(next_token) | |
| if next_token == eos_idx: | |
| break | |
| return tgt_tokenizer.decode(tgt_indices, skip_special_tokens=True) | |
| def train_transformer_epoch(model, dataloader, optimizer, criterion, device): | |
| model.train() | |
| total_loss = 0 | |
| for batch in dataloader: | |
| src = batch['src_ids'].to(device) | |
| tgt = batch['tgt_ids'].to(device) | |
| tgt_input = tgt[:, :-1] | |
| tgt_output = tgt[:, 1:] | |
| optimizer.zero_grad() | |
| output = model(src, tgt_input) | |
| loss = criterion(output.reshape(-1, output.shape[-1]), tgt_output.reshape(-1)) | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) | |
| optimizer.step() | |
| total_loss += loss.item() | |
| return total_loss / len(dataloader) | |
| def evaluate_transformer(model, dataloader, criterion, device): | |
| model.eval() | |
| total_loss = 0 | |
| with torch.no_grad(): | |
| for batch in dataloader: | |
| src = batch['src_ids'].to(device) | |
| tgt = batch['tgt_ids'].to(device) | |
| tgt_input = tgt[:, :-1] | |
| tgt_output = tgt[:, 1:] | |
| output = model(src, tgt_input) | |
| loss = criterion(output.reshape(-1, output.shape[-1]), tgt_output.reshape(-1)) | |
| total_loss += loss.item() | |
| return total_loss / len(dataloader) | |
| if __name__ == "__main__": | |
| # 1️⃣ Download and extract Hindi Aksharantar | |
| url = "https://huggingface.co/datasets/ai4bharat/Aksharantar/resolve/main/hin.zip" | |
| resp = requests.get(url) | |
| resp.raise_for_status() | |
| tmpdir = tempfile.mkdtemp() | |
| with zipfile.ZipFile(io.BytesIO(resp.content), "r") as zip_ref: | |
| zip_ref.extractall(tmpdir) | |
| print("Extracted:", os.listdir(tmpdir)) | |
| clean_dir = os.path.join(tmpdir, "cleaned") | |
| os.makedirs(clean_dir, exist_ok=True) | |
| for split in ["train", "valid", "test"]: | |
| clean_json_file( | |
| os.path.join(tmpdir, f"hin_{split}.json"), | |
| os.path.join(clean_dir, f"{split}.json") | |
| ) | |
| # 3️⃣ Load cleaned dataset | |
| dataset = load_dataset( | |
| "json", | |
| data_files={ | |
| "train": os.path.join(clean_dir, "train.json"), | |
| "validation": os.path.join(clean_dir, "valid.json"), | |
| "test": os.path.join(clean_dir, "test.json"), | |
| }, | |
| ) | |
| print(dataset) | |
| print(dataset["train"][0]) | |
| # Analyze BEFORE sampling | |
| print("\n" + "="*80) | |
| print("DATA ANALYSIS - BEFORE SAMPLING") | |
| print("="*80) | |
| train_stats_before = analyze_dataset_statistics(dataset['train'], 'train (before sampling)') | |
| valid_stats = analyze_dataset_statistics(dataset['validation'], 'validation') | |
| test_stats = analyze_dataset_statistics(dataset['test'], 'test') | |
| # Save statistics | |
| os.makedirs("analysis", exist_ok=True) | |
| train_stats_before.to_csv('analysis/data_stats_train_before_sampling.csv', index=False) | |
| valid_stats.to_csv('analysis/data_stats_valid.csv', index=False) | |
| test_stats.to_csv('analysis/data_stats_test.csv', index=False) | |
| full_dataset = list(dataset['train']) | |
| dataset['train'] = sample_transliteration_dataset(full_dataset, sample_size=100000, top_freq_ratio=0.3) | |
| print(f"\n✅ Sampled dataset size: {len(dataset['train'])}") | |
| print("Example entries:", dataset['train'][:5]) | |
| # Analyze AFTER sampling | |
| print("\n" + "="*80) | |
| print("DATA ANALYSIS - AFTER SAMPLING") | |
| print("="*80) | |
| train_stats_after = analyze_dataset_statistics(dataset['train'], 'train (after sampling)') | |
| train_stats_after.to_csv('analysis/data_stats_train_after_sampling.csv', index=False) | |
| # ======================= | |
| # 6. TOKENIZERS + DATALOADERS | |
| # ======================= | |
| src_tokenizer = CharTokenizer() | |
| tgt_tokenizer = CharTokenizer() | |
| # fit tokenizers on training set | |
| train_items = list(dataset['train']) | |
| valid_items = list(dataset['validation']) | |
| test_items = list(dataset['test']) | |
| # Build tokenizer vocab from train split | |
| src_texts = [item['english word'] for item in train_items] | |
| tgt_texts = [item['native word'] for item in train_items] | |
| src_tokenizer.fit(src_texts) | |
| tgt_tokenizer.fit(tgt_texts) | |
| print(f"\nSource vocab size: {len(src_tokenizer)}") | |
| print(f"Target vocab size: {len(tgt_tokenizer)}") | |
| train_split = train_items | |
| valid_split = valid_items | |
| test_split = test_items | |
| train_dataset = TransliterationDataset(train_split, src_tokenizer, tgt_tokenizer) | |
| valid_dataset = TransliterationDataset(valid_split, src_tokenizer, tgt_tokenizer) | |
| test_dataset = TransliterationDataset(test_split, src_tokenizer, tgt_tokenizer) | |
| train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, collate_fn=collate_fn) | |
| valid_loader = DataLoader(valid_dataset, batch_size=128, shuffle=False, collate_fn=collate_fn) | |
| test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn) | |
| # ======================= | |
| # 7. LSTM MODEL / TRAINING SETUP | |
| # ======================= | |
| EMBED_DIM = 256 | |
| ENC_HIDDEN_DIM = 256 | |
| DEC_HIDDEN_DIM = 256 | |
| NUM_LAYERS_MODEL = 2 | |
| DROPOUT = 0.3 | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"\nUsing device: {device}") | |
| encoder = Encoder(len(src_tokenizer), EMBED_DIM, ENC_HIDDEN_DIM, NUM_LAYERS_MODEL, DROPOUT) | |
| decoder = Decoder(len(tgt_tokenizer), EMBED_DIM, ENC_HIDDEN_DIM, DEC_HIDDEN_DIM, NUM_LAYERS_MODEL, DROPOUT) | |
| model = Seq2Seq(encoder, decoder, device).to(device) | |
| criterion = nn.CrossEntropyLoss(ignore_index=0) | |
| optimizer = torch.optim.Adam(model.parameters(), lr=0.001) | |
| # ======================= | |
| # 8. LSTM TRAINING LOOP | |
| # ======================= | |
| print("\n" + "="*80) | |
| print("TRAINING LSTM MODEL") | |
| print("="*80 + "\n") | |
| NUM_EPOCHS = 10 | |
| for epoch in range(NUM_EPOCHS): | |
| train_loss = train_epoch(model, train_loader, optimizer, criterion, device) | |
| valid_loss = evaluate(model, valid_loader, criterion, device) | |
| print(f'Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}') | |
| torch.save({ | |
| 'model_state_dict': model.state_dict(), | |
| 'src_vocab': src_tokenizer.char2idx, | |
| 'tgt_vocab': tgt_tokenizer.char2idx, | |
| }, 'lstm_transliterator.pt') | |
| # ======================= | |
| # 9. LSTM EVALUATION WITH FULL DETAILS | |
| # ======================= | |
| print("\n" + "="*80) | |
| print("LSTM MODEL EVALUATION") | |
| print("="*80) | |
| # Greedy decoding | |
| greedy_results_lstm, greedy_metrics_lstm = evaluate_with_full_details( | |
| model, test_loader, | |
| src_tokenizer, tgt_tokenizer, | |
| device=device, | |
| output_file='analysis/lstm_test_results_greedy.jsonl', | |
| decoding='greedy' | |
| ) | |
| # Beam search decoding | |
| beam_results_lstm, beam_metrics_lstm = evaluate_with_full_details( | |
| model, test_loader, | |
| src_tokenizer, tgt_tokenizer, | |
| device=device, | |
| output_file='analysis/lstm_test_results_beam.jsonl', | |
| decoding='beam', | |
| beam_width=5 | |
| ) | |
| test_words = ['namaste', 'dhanyavaad', 'bharat'] | |
| print("\nQuick manual checks (LSTM):") | |
| for word in test_words: | |
| print(f"{word} -> {model.translate(word, src_tokenizer, tgt_tokenizer)}") | |
| # ======================= | |
| # 11. TRANSFORMER TRAINING | |
| # ======================= | |
| print("\n" + "="*80) | |
| print("TRAINING TRANSFORMER MODEL") | |
| print("="*80 + "\n") | |
| D_MODEL = 256 | |
| NHEAD = 8 | |
| NUM_ENCODER_LAYERS = 2 | |
| NUM_DECODER_LAYERS = 2 | |
| DIM_FEEDFORWARD = 512 | |
| DROPOUT = 0.1 | |
| transformer_model = TransformerTransliterator( | |
| src_vocab_size=len(src_tokenizer), | |
| tgt_vocab_size=len(tgt_tokenizer), | |
| d_model=D_MODEL, | |
| nhead=NHEAD, | |
| num_encoder_layers=NUM_ENCODER_LAYERS, | |
| num_decoder_layers=NUM_DECODER_LAYERS, | |
| dim_feedforward=DIM_FEEDFORWARD, | |
| dropout=DROPOUT | |
| ).to(device) | |
| criterion = nn.CrossEntropyLoss(ignore_index=0, label_smoothing=0.1) | |
| optimizer = torch.optim.Adam(transformer_model.parameters(), lr=1e-4, betas=(0.9, 0.98), eps=1e-9) | |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2) | |
| NUM_EPOCHS = 10 | |
| for epoch in range(NUM_EPOCHS): | |
| train_loss = train_transformer_epoch(transformer_model, train_loader, optimizer, criterion, device) | |
| valid_loss = evaluate_transformer(transformer_model, valid_loader, criterion, device) | |
| scheduler.step(valid_loss) | |
| print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}") | |
| torch.save({ | |
| 'model_state_dict': transformer_model.state_dict(), | |
| 'src_vocab': src_tokenizer.char2idx, | |
| 'tgt_vocab': tgt_tokenizer.char2idx, | |
| }, 'transformer_transliterator.pt') | |
| # ======================= | |
| # 12. TRANSFORMER EVALUATION WITH FULL DETAILS | |
| # ======================= | |
| print("\n" + "="*80) | |
| print("TRANSFORMER MODEL EVALUATION") | |
| print("="*80) | |
| # Greedy decoding | |
| greedy_results_transformer, greedy_metrics_transformer = evaluate_with_full_details( | |
| transformer_model, test_loader, | |
| src_tokenizer, tgt_tokenizer, | |
| device=device, | |
| output_file='analysis/transformer_test_results_greedy.jsonl', | |
| decoding='greedy' | |
| ) | |
| # Beam search decoding | |
| beam_results_transformer, beam_metrics_transformer = evaluate_with_full_details( | |
| transformer_model, test_loader, | |
| src_tokenizer, tgt_tokenizer, | |
| device=device, | |
| output_file='analysis/transformer_test_results_beam.jsonl', | |
| decoding='beam', | |
| beam_width=5 | |
| ) | |
| test_words = ['namaste', 'dhanyavaad', 'bharat', 'mumbai', 'hindustan'] | |
| print("\nTest Translations (Transformer):") | |
| for word in test_words: | |
| translated = transformer_model.translate(word, src_tokenizer, tgt_tokenizer, device=device) | |
| print(f"{word} -> {translated}") | |
| # ======================= | |
| # 13. FINAL SUMMARY | |
| # ======================= | |
| print("\n" + "="*80) | |
| print("TRAINING COMPLETE - SUMMARY") | |
| print("="*80 + "\n") | |
| print("LSTM MODEL:") | |
| print(f" Greedy - Word Acc: {greedy_metrics_lstm['word_accuracy']:.4f}, Char F1: {greedy_metrics_lstm['char_f1']:.4f}, Char Acc: {greedy_metrics_lstm['char_accuracy']:.4f}") | |
| print(f" Beam(5) - Word Acc: {beam_metrics_lstm['word_accuracy']:.4f}, Char F1: {beam_metrics_lstm['char_f1']:.4f}, Char Acc: {beam_metrics_lstm['char_accuracy']:.4f}") | |
| print("\nTRANSFORMER MODEL:") | |
| print(f" Greedy - Word Acc: {greedy_metrics_transformer['word_accuracy']:.4f}, Char F1: {greedy_metrics_transformer['char_f1']:.4f}, Char Acc: {greedy_metrics_transformer['char_accuracy']:.4f}") | |
| print(f" Beam(5) - Word Acc: {beam_metrics_transformer['word_accuracy']:.4f}, Char F1: {beam_metrics_transformer['char_f1']:.4f}, Char Acc: {beam_metrics_transformer['char_accuracy']:.4f}") | |
| print("\n📁 OUTPUT FILES:") | |
| print(" Data Statistics:") | |
| print(" - analysis/data_stats_train_before_sampling.csv") | |
| print(" - analysis/data_stats_train_after_sampling.csv") | |
| print(" - analysis/data_stats_valid.csv") | |
| print(" - analysis/data_stats_test.csv") | |
| print(" Model Checkpoints:") | |
| print(" - lstm_transliterator.pt") | |
| print(" - transformer_transliterator.pt") | |
| print(" Test Results (with full metadata):") | |
| print(" - analysis/lstm_test_results_greedy.jsonl") | |
| print(" - analysis/lstm_test_results_beam.jsonl") | |
| print(" - analysis/transformer_test_results_greedy.jsonl") | |
| print(" - analysis/transformer_test_results_beam.jsonl") | |
| print("\n✅ All done! Check the analysis/ directory for detailed results.") | |
| print("="*80) |