Guilherme
Add Precision/Recall: manual & CSV tabs; batch core; BERTScore+ROUGE
2e6da1f
# metrics/bertscore.py
"""
BERTScore helpers: scorer init, single and batch computation.
Adds precision/recall alongside F1 (UI shows F1; CSV export includes P/R too).
"""
from bert_score import BERTScorer
from functools import lru_cache
from transformers import AutoTokenizer, AutoConfig
from utils.file_utils import extract_sections, has_sections
import pandas as pd
# manual layer mapping (fallback; we also cap by config.num_hidden_layers if available)
_MANUAL_BERT_LAYERS = {
"neuralmind/bert-base-portuguese-cased": 12,
"pucpr/biobertpt-clin": 12,
"xlm-roberta-large": 24,
"medicalai/ClinicalBERT": 12,
}
# friendly label ↔ model id mapping
BERT_FRIENDLY_TO_MODEL = {
"Portuguese (Br) Bert": "neuralmind/bert-base-portuguese-cased",
"Portuguese (Br) Clinical BioBert": "pucpr/biobertpt-clin",
"Multilingual Bert ( RoBerta)": "xlm-roberta-large",
"ClinicalBERT (medicalai)": "medicalai/ClinicalBERT",
}
BERT_MODEL_TO_FRIENDLY = {v: k for k, v in BERT_FRIENDLY_TO_MODEL.items()}
_USE_RESCALE_BASELINE = False
def _safe_num_layers(model_type: str) -> int | None:
# Try to read from HF config; fallback to manual
try:
cfg = AutoConfig.from_pretrained(model_type)
if hasattr(cfg, "num_hidden_layers") and isinstance(cfg.num_hidden_layers, int):
return cfg.num_hidden_layers
except Exception:
pass
return _MANUAL_BERT_LAYERS.get(model_type)
@lru_cache(maxsize=6)
def get_bertscore_scorer(model_type: str):
lang = "pt" if any(model_type.startswith(p) for p in ("neuralmind", "pucpr")) else ""
num_layers = _safe_num_layers(model_type)
kwargs = {"lang": lang, "rescale_with_baseline": _USE_RESCALE_BASELINE}
if num_layers is not None:
kwargs["num_layers"] = num_layers
return BERTScorer(model_type=model_type, **kwargs)
def chunk_text_with_stride(text: str, tokenizer, max_len: int = 512, stride: int = 50):
ids = tokenizer.encode(text, add_special_tokens=True)
if len(ids) <= max_len:
return [tokenizer.decode(ids, skip_special_tokens=True)]
chunks, step = [], max_len - stride
for i in range(0, len(ids), step):
subset = ids[i : i + max_len]
if not subset:
break
chunks.append(tokenizer.decode(subset, skip_special_tokens=True))
if i + max_len >= len(ids):
break
return chunks
def bertscore_prec_rec_f1(reference: str, prediction: str, model_type: str):
"""
Return (precision, recall, f1) for a single reference/prediction pair.
Handles long texts by chunking and averaging the per-chunk scores.
On error, returns (None, None, None).
"""
if not reference or not prediction:
return (None, None, None)
try:
scorer = get_bertscore_scorer(model_type)
tokenizer = AutoTokenizer.from_pretrained(model_type, use_fast=True)
gen_chunks = chunk_text_with_stride(prediction, tokenizer)
ref_chunks = chunk_text_with_stride(reference, tokenizer)
paired = list(zip(gen_chunks, ref_chunks))
if not paired:
return (0.0, 0.0, 0.0)
p_vals, r_vals, f_vals = [], [], []
for gc, rc in paired:
P, R, F1 = scorer.score([gc], [rc])
p_vals.append(float(P[0]))
r_vals.append(float(R[0]))
f_vals.append(float(F1[0]))
n = float(len(p_vals))
return (sum(p_vals) / n, sum(r_vals) / n, sum(f_vals) / n)
except Exception:
return (None, None, None)
def compute_bertscore_single(reference: str, prediction: str, model_type: str, per_section: bool = False):
"""
If per_section=False: returns float global F1 (0..1) or None on error.
If per_section=True: returns dict with keys:
- bertscore_global_{p,r,f1}
- bertscore_{S,O,A,P}_{p,r,f1} (when sections exist; else None)
"""
if not reference or not prediction:
return None if not per_section else {}
try:
scorer = get_bertscore_scorer(model_type)
tokenizer = AutoTokenizer.from_pretrained(model_type, use_fast=True)
def score_pair(pred_text, ref_text):
if not pred_text or not ref_text:
return None, None, None
try:
P, R, F1 = scorer.score([pred_text], [ref_text])
return float(P[0]), float(R[0]), float(F1[0])
except Exception:
return None, None, None
# global (average over chunk pairs)
pred_chunks = chunk_text_with_stride(prediction, tokenizer)
ref_chunks = chunk_text_with_stride(reference, tokenizer)
paired = list(zip(pred_chunks, ref_chunks))
ps, rs, f1s = [], [], []
for pc, rc in paired:
p, r, f1 = score_pair(pc, rc)
if p is not None:
ps.append(p)
if r is not None:
rs.append(r)
if f1 is not None:
f1s.append(f1)
global_p = sum(ps) / len(ps) if ps else 0.0
global_r = sum(rs) / len(rs) if rs else 0.0
global_f1 = sum(f1s) / len(f1s) if f1s else 0.0
if not per_section:
return global_f1
out = {
"bertscore_global_p": global_p,
"bertscore_global_r": global_r,
"bertscore_global_f1": global_f1,
}
# per-section only if both texts have sections
if has_sections(reference) and has_sections(prediction):
sections_ref = extract_sections(reference)
sections_pred = extract_sections(prediction)
for tag in ["S", "O", "A", "P"]:
pred_sec = sections_pred.get(tag, "")
ref_sec = sections_ref.get(tag, "")
if pred_sec and ref_sec:
ps, rs, f1s = [], [], []
pred_chunks = chunk_text_with_stride(pred_sec, tokenizer)
ref_chunks = chunk_text_with_stride(ref_sec, tokenizer)
for pc, rc in zip(pred_chunks, ref_chunks):
p, r, f1 = score_pair(pc, rc)
if p is not None:
ps.append(p)
if r is not None:
rs.append(r)
if f1 is not None:
f1s.append(f1)
out[f"bertscore_{tag}_p"] = sum(ps) / len(ps) if ps else 0.0
out[f"bertscore_{tag}_r"] = sum(rs) / len(rs) if rs else 0.0
out[f"bertscore_{tag}_f1"] = sum(f1s) / len(f1s) if f1s else 0.0
else:
out[f"bertscore_{tag}_p"] = None
out[f"bertscore_{tag}_r"] = None
out[f"bertscore_{tag}_f1"] = None
else:
for tag in ["S", "O", "A", "P"]:
out[f"bertscore_{tag}_p"] = None
out[f"bertscore_{tag}_r"] = None
out[f"bertscore_{tag}_f1"] = None
return out
except Exception:
return None if not per_section else {}
def compute_batch_bertscore(df: pd.DataFrame, bert_models: list, per_section: bool = False) -> pd.DataFrame:
"""
If per_section=True and single model:
returns per-section + global BERTScore columns for {p,r,f1}.
Otherwise:
per-model global {p,r,f1} columns: bertscore_{modelshort}_{p,r,f1}
"""
if not bert_models:
return pd.DataFrame(index=df.index)
preds = df["dsc_generated_clinical_report"].astype(str).tolist()
refs = df["dsc_reference_free_text"].astype(str).tolist()
add = {}
single_model = len(bert_models) == 1
for friendly in bert_models:
model_id = BERT_FRIENDLY_TO_MODEL.get(friendly, friendly)
short = model_id.split("/")[-1].replace("-", "_")
if per_section and single_model:
col_data = {
"bertscore_global_p": [],
"bertscore_global_r": [],
"bertscore_global_f1": [],
"bertscore_S_p": [], "bertscore_S_r": [], "bertscore_S_f1": [],
"bertscore_O_p": [], "bertscore_O_r": [], "bertscore_O_f1": [],
"bertscore_A_p": [], "bertscore_A_r": [], "bertscore_A_f1": [],
"bertscore_P_p": [], "bertscore_P_r": [], "bertscore_P_f1": [],
}
for pred, ref in zip(preds, refs):
scores = compute_bertscore_single(ref, pred, model_id, per_section=True)
if not scores:
for k in col_data:
col_data[k].append(None)
else:
for k in col_data:
col_data[k].append(scores.get(k))
add.update(col_data)
else:
scorer = get_bertscore_scorer(model_id)
tokenizer = AutoTokenizer.from_pretrained(model_id, use_fast=True)
p_list, r_list, f1_list = [], [], []
for pred, ref in zip(preds, refs):
try:
pred_chunks = chunk_text_with_stride(pred, tokenizer)
ref_chunks = chunk_text_with_stride(ref, tokenizer)
paired = list(zip(pred_chunks, ref_chunks))
if not paired:
p_list.append(None); r_list.append(None); f1_list.append(None)
continue
Ps, Rs, F1s = [], [], []
for pc, rc in paired:
P, R, F1 = scorer.score([pc], [rc])
Ps.append(float(P[0])); Rs.append(float(R[0])); F1s.append(float(F1[0]))
p_list.append(sum(Ps)/len(Ps) if Ps else None)
r_list.append(sum(Rs)/len(Rs) if Rs else None)
f1_list.append(sum(F1s)/len(F1s) if F1s else None)
except Exception:
p_list.append(None); r_list.append(None); f1_list.append(None)
add[f"bertscore_{short}_p"] = p_list
add[f"bertscore_{short}_r"] = r_list
add[f"bertscore_{short}_f1"] = f1_list
return pd.DataFrame(add, index=df.index)