import torch from torch import nn from torch.utils.data import Dataset, DataLoader, RandomSampler from torch.optim import AdamW from transformers import BertJapaneseTokenizer, BertModel from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score import pandas as pd from tqdm import tqdm import os # osモジュールをインポート import numpy as np try: import matplotlib matplotlib.use('Agg') # GUIバックエンドを使わない import matplotlib.pyplot as plt MATPLOTLIB_AVAILABLE = True except ImportError: MATPLOTLIB_AVAILABLE = False print("警告: matplotlibがインストールされていません。グラフは表示されません。") from DataNLP import load_preprocessed_data # --- スクリプトのディレクトリを基準にパスを設定 --- # このスクリプト自身の絶対パスを取得 script_dir = os.path.dirname(os.path.abspath(__file__)) # 作業ディレクトリをこのスクリプトがあるディレクトリに変更 os.chdir(script_dir) # 設定 PRE_TRAINED_MODEL_NAME = 'cl-tohoku/bert-large-japanese' MAX_LEN = 128 BATCH_SIZE = 32 # バッチサイズを増加して高速化 EPOCHS = 10 # 重みを大幅に更新するためエポック数を増加 DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") NUM_AGE_CLASSIFIERS = 6 # 各年代ごとに二値分類器 AGE_CATEGORIES = ["10代", "20代", "30代", "40代", "50代", "60代"] # --- データセットクラス --- class CustomDataset(Dataset): def __init__(self, texts, age_labels_dict, tokenizer, max_len): """ age_labels_dict: {'10代_label': array, '20代_label': array, ...} """ self.texts = texts self.age_labels_dict = age_labels_dict self.tokenizer = tokenizer self.max_len = max_len def __len__(self): return len(self.texts) def __getitem__(self, item): text = str(self.texts[item]) encoding = self.tokenizer.encode_plus( text, add_special_tokens=True, max_length=self.max_len, return_token_type_ids=False, padding='max_length', truncation=True, return_attention_mask=True, return_tensors='pt', ) # 各年代の二値ラベルを取得 age_labels = torch.tensor([ int(self.age_labels_dict[f"{age}_label"][item]) for age in AGE_CATEGORIES ], dtype=torch.float) return { 'input_ids': encoding['input_ids'].flatten(), 'attention_mask': encoding['attention_mask'].flatten(), 'age_labels': age_labels, # shape: (6,) - 各年代の二値ラベル } # --- モデル定義 --- class BertForAgeClassification(nn.Module): def __init__(self, model_name, num_age_classifiers): super().__init__() self.bert = BertModel.from_pretrained(model_name, use_safetensors=True) self.dropout = nn.Dropout(self.bert.config.hidden_dropout_prob) # 各年代ごとに二値分類器を作成(6個) self.age_classifiers = nn.ModuleList([ nn.Linear(self.bert.config.hidden_size, 1) # 二値分類なので出力は1 for _ in range(num_age_classifiers) ]) def forward(self, input_ids, attention_mask, age_labels=None): outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask) pooled_output = outputs.pooler_output pooled_output = self.dropout(pooled_output) # 各年代の二値分類器の出力を取得 age_logits_list = [classifier(pooled_output) for classifier in self.age_classifiers] age_logits = torch.cat(age_logits_list, dim=1) # shape: (batch_size, 6) loss = None if age_labels is not None: # 各年代の二値分類損失(BCEWithLogitsLoss) bce_loss = nn.BCEWithLogitsLoss() loss = bce_loss(age_logits, age_labels) return loss, age_logits # --- 学習関数 --- def train_epoch(model, data_loader, optimizer, device): model.train() total_loss = 0 for batch in tqdm(data_loader, desc="Training"): input_ids = batch['input_ids'].to(device) attention_mask = batch['attention_mask'].to(device) age_labels = batch['age_labels'].to(device) optimizer.zero_grad() # モデルのforward関数から出力を取得 loss, age_logits = model(input_ids=input_ids, attention_mask=attention_mask, age_labels=age_labels) if loss is not None: loss.backward() optimizer.step() total_loss += loss.item() return total_loss / len(data_loader) # --- 評価関数 --- def eval_model(model, data_loader, device): model.eval() age_preds_all = {age: [] for age in AGE_CATEGORIES} # 各年代の予測 age_true_all = {age: [] for age in AGE_CATEGORIES} # 各年代の正解 with torch.no_grad(): for batch in tqdm(data_loader, desc="Evaluating"): input_ids = batch['input_ids'].to(device) attention_mask = batch['attention_mask'].to(device) _, age_logits = model(input_ids=input_ids, attention_mask=attention_mask) # 各年代の二値分類の予測(シグモイド関数で0-1に変換後、0.5で閾値判定) age_probs = torch.sigmoid(age_logits) # shape: (batch_size, 6) age_preds_binary = (age_probs > 0.5).cpu().numpy() # shape: (batch_size, 6) age_true_binary = batch['age_labels'].cpu().numpy() # shape: (batch_size, 6) # 各年代ごとに予測と正解を保存 for i, age in enumerate(AGE_CATEGORIES): age_preds_all[age].extend(age_preds_binary[:, i]) age_true_all[age].extend(age_true_binary[:, i]) # 各年代の精度を計算 age_accuracies = {} for age in AGE_CATEGORIES: age_accuracies[age] = accuracy_score(age_true_all[age], age_preds_all[age]) return age_accuracies # --- 学習曲線表示関数 --- def plot_training_curves(train_losses, val_accuracies): """ 学習曲線(Loss CurveとAccuracy Curve)を表示する """ if not MATPLOTLIB_AVAILABLE: print("matplotlibが利用できないため、グラフを表示できません。") return epochs = range(1, len(train_losses) + 1) # 2つのサブプロットを作成 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5)) # Loss Curve ax1.plot(epochs, train_losses, 'b-', label='Training Loss', linewidth=2) ax1.set_title('Training Loss Curve', fontsize=14, fontweight='bold') ax1.set_xlabel('Epoch') ax1.set_ylabel('Loss') ax1.grid(True, alpha=0.3) ax1.legend() # Accuracy Curve colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown'] for i, age in enumerate(AGE_CATEGORIES): ax2.plot(epochs, val_accuracies[age], color=colors[i], label=f'{age} Accuracy', linewidth=2, marker='o', markersize=4) ax2.set_title('Validation Accuracy Curves', fontsize=14, fontweight='bold') ax2.set_xlabel('Epoch') ax2.set_ylabel('Accuracy') ax2.set_ylim(0, 1) ax2.grid(True, alpha=0.3) ax2.legend(bbox_to_anchor=(1.05, 1), loc='upper left') plt.tight_layout() plt.savefig('age_training_curves.png', dpi=300, bbox_inches='tight') plt.show() # 最終的な精度を表示 print("\n=== 最終的な検証精度 ===") for age in AGE_CATEGORIES: final_acc = val_accuracies[age][-1] print(f"{age}: {final_acc:.4f} ({final_acc*100:.2f}%)") avg_acc = np.mean([val_accuracies[age][-1] for age in AGE_CATEGORIES]) print(f"\n平均精度: {avg_acc:.4f} ({avg_acc*100:.2f}%)") # --- データサンプリング関数(年代と性別を別々にバランシング) --- def sample_balanced_data(df, max_per_age=5000, max_per_gender=5000): """ 年代と性別を別々にバランシングする - 年代:各年代ごとに最大max_per_age件(性別関係なく) - 性別:各性別ごとに最大max_per_gender件(年代関係なく) 両方の条件を満たすデータのみを残す """ # 年代ごとにサンプリング age_sampled_dfs = [] for age in AGE_CATEGORIES: subset = df[df['年代'] == age] if len(subset) > max_per_age: subset = subset.sample(max_per_age, random_state=42) age_sampled_dfs.append(subset) age_balanced_df = pd.concat(age_sampled_dfs).reset_index(drop=True) # 性別ごとにサンプリング gender_sampled_dfs = [] for gender_label in age_balanced_df['性別_label'].unique(): subset = age_balanced_df[age_balanced_df['性別_label'] == gender_label] if len(subset) > max_per_gender: subset = subset.sample(max_per_gender, random_state=42) gender_sampled_dfs.append(subset) return pd.concat(gender_sampled_dfs).sample(frac=1, random_state=42).reset_index(drop=True) def create_balanced_binary_labels(df, samples_per_label=2000): """ 各年代の二値分類器を完全に独立させてバランスを取る - 各年代について、正例と負例を同じ数にする(重複なし) - samples_per_label: 各ラベル(正例・負例)あたりのサンプル数 """ # まず、各年代ごとに利用可能なデータ数を確認 print("\n各年代のデータ数:") for age in AGE_CATEGORIES: count = len(df[df['年代'] == age]) print(f" {age}: {count}件") # 各年代用のデータセットを個別に作成 age_datasets = {} for age in AGE_CATEGORIES: print(f"\n {age}の二値分類器用データを作成中...") # 正例(該当年代)のデータ positive_samples = df[df['年代'] == age].copy() actual_positive = min(len(positive_samples), samples_per_label) if len(positive_samples) > samples_per_label: positive_samples = positive_samples.sample(samples_per_label, random_state=42) else: print(f" 警告: {age}の正例は{len(positive_samples)}件しかありません") # 負例(他の年代)のデータ - 正例と同じ数だけサンプリング negative_samples = df[df['年代'] != age].copy() target_negative = len(positive_samples) # 正例と同じ数 if len(negative_samples) > target_negative: negative_samples = negative_samples.sample(target_negative, random_state=42) # 正例と負例を結合(この年代専用) age_dataset = pd.concat([positive_samples, negative_samples]).reset_index(drop=True) age_datasets[age] = age_dataset print(f" {age}: 正例{len(positive_samples)}件, 負例{len(negative_samples)}件 (合計{len(age_dataset)}件)") # 全ての年代のデータセットを結合してシャッフル # ※各データは複数の年代の分類器で使われるが、各分類器内ではバランスが取れている all_data = [] for age, dataset in age_datasets.items(): all_data.append(dataset) # インデックスで重複を除去(リスト型カラムがあるためdrop_duplicatesは使えない) final_df = pd.concat(all_data, ignore_index=True) final_df = final_df.loc[~final_df.index.duplicated(keep='first')] final_df = final_df.sample(frac=1, random_state=42).reset_index(drop=True) print(f"\n統合後のデータ数: {len(final_df)}件") return final_df # --- メイン処理 --- def main(): print("--- 1. データ読み込み ---") df, _ = load_preprocessed_data() # --- 各年代の二値分類でバランスを取る --- print("--- 各年代の二値分類でバランス調整 ---") df = create_balanced_binary_labels(df, samples_per_label=2000) # 各ラベル2000件ずつ # ラベルの分布を確認 print("\n各年代の二値ラベル分布(バランス調整後):") for age in AGE_CATEGORIES: positive_count = df[f"{age}_label"].sum() negative_count = len(df) - positive_count print(f" {age}: 正例{positive_count}件, 負例{negative_count}件") print(f"\n合計データ数: {len(df)} 件") # 訓練用と検証用に分割 train_df, val_df = train_test_split(df, test_size=0.2, random_state=42) print(f"\n--- 2. トークナイザとデータローダーの準備 ---") tokenizer = BertJapaneseTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME) # 各年代のラベルを辞書形式で渡す train_age_labels_dict = {f"{age}_label": train_df[f"{age}_label"].values for age in AGE_CATEGORIES} val_age_labels_dict = {f"{age}_label": val_df[f"{age}_label"].values for age in AGE_CATEGORIES} train_dataset = CustomDataset( train_df['text'].values, train_age_labels_dict, tokenizer, MAX_LEN ) train_sampler = RandomSampler(train_dataset) train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=train_sampler) val_dataset = CustomDataset( val_df['text'].values, val_age_labels_dict, tokenizer, MAX_LEN ) val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE) print("\n--- 3. モデルのセットアップ ---") model = BertForAgeClassification(PRE_TRAINED_MODEL_NAME, NUM_AGE_CLASSIFIERS) model.to(DEVICE) # BERT全体をファインチューニング(レイヤーごとに異なる学習率を設定) optimizer = AdamW([ {'params': model.bert.parameters(), 'lr': 2e-5}, # BERT本体は小さい学習率 {'params': model.age_classifiers.parameters(), 'lr': 5e-4}, # 分類層は大きい学習率 ]) print("\n--- 4. 学習開始 ---") print(f"デバイス: {DEVICE}") print(f"訓練データ数: {len(train_df)} 件") print(f"検証データ数: {len(val_df)} 件") print(f"バッチサイズ: {BATCH_SIZE}") print(f"エポック数: {EPOCHS}") print(f"推定学習時間: 約66時間") # 学習履歴を保存するリスト train_losses = [] val_accuracies = {age: [] for age in AGE_CATEGORIES} import time start_time = time.time() for epoch in range(EPOCHS): epoch_start_time = time.time() print(f"\n{'='*60}") print(f"Epoch {epoch + 1}/{EPOCHS} 開始") print(f"{'='*60}") train_loss = train_epoch(model, train_loader, optimizer, DEVICE) print(f"Train Loss: {train_loss:.4f}") # 学習損失を記録 train_losses.append(train_loss) age_accuracies = eval_model(model, val_loader, DEVICE) print("\nAge Validation Accuracies:") for age in AGE_CATEGORIES: print(f" {age}: {age_accuracies[age]:.4f} ({age_accuracies[age]*100:.2f}%)") val_accuracies[age].append(age_accuracies[age]) # 平均精度を計算 avg_acc = sum(age_accuracies.values()) / len(age_accuracies) print(f"\n平均精度: {avg_acc:.4f} ({avg_acc*100:.2f}%)") # エポックの経過時間を表示 epoch_time = time.time() - epoch_start_time elapsed_time = time.time() - start_time remaining_epochs = EPOCHS - (epoch + 1) estimated_remaining_time = (elapsed_time / (epoch + 1)) * remaining_epochs print(f"\nエポック所要時間: {epoch_time/60:.1f}分") print(f"経過時間: {elapsed_time/3600:.1f}時間") print(f"推定残り時間: {estimated_remaining_time/3600:.1f}時間") print(f"{'='*60}") print("\n--- 5. 学習完了 ---") torch.save(model.state_dict(), 'bert_age_model.bin') print("モデルを 'bert_age_model.bin' に保存しました。") # Loss CurveとAccuracy Curveを表示 print("\n--- 6. 学習曲線の表示 ---") plot_training_curves(train_losses, val_accuracies) if __name__ == '__main__': main()