Spaces:
Running
Running
import torch | |
import pandas as pd | |
from sentence_transformers import SentenceTransformer, util | |
import gradio as gr | |
import re | |
from rank_bm25 import BM25Okapi | |
import numpy as np | |
# Load models | |
model = SentenceTransformer("distilbert-base-multilingual-cased") | |
modela = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") | |
# Load data | |
df = pd.read_csv("cleaned1.csv") | |
df2 = pd.read_csv("cleaned2.csv") | |
df3 = pd.read_csv("cleaned3.csv") | |
# Load pre-computed embeddings | |
embeddings = torch.load("embeddings1_1.pt") | |
embeddings2 = torch.load("embeddings2_1.pt") | |
embeddings3 = torch.load("embeddings3_1.pt") | |
embeddingsa = torch.load("embeddings1.pt") | |
embeddingsa2 = torch.load("embeddings2.pt") | |
embeddingsa3 = torch.load("embeddings3.pt") | |
# Extract questions and links | |
df_questions = df["question"].values | |
df_links = df["link"].values | |
df2_questions = df2["question"].values | |
df2_links = df2["link"].values | |
df3_questions = df3["question"].values | |
df3_links = df3["url"].values | |
# ARABIC_STOPWORDS = { | |
# 'ูู', 'ู ู', 'ุฅูู', 'ุนู', 'ู ุน', 'ูุฐุง', 'ูุฐู', 'ุฐูู', 'ุชูู', | |
# 'ุงูุชู', 'ุงูุฐู', 'ู ุง', 'ูุง', 'ุฃู', 'ุฃู', 'ููู', 'ูุฏ', 'ุญูู ', 'ูุงู', | |
# 'ูุงู', 'ูุงูุช', 'ูููู', 'ุชููู', 'ูู', 'ููุง', 'ููู ', 'ู', 'ุฃู ', 'ุฅู' | |
# } | |
ARABIC_STOPWORDS = { | |
'ูู', 'ู ู', 'ุฅูู', 'ุนู', 'ู ุน', 'ูุฐุง', 'ูุฐู', 'ุฐูู', 'ุชูู', | |
'ุงูุชู', 'ุงูุฐู', 'ู ุง', 'ูุง', 'ุฃู', 'ุฃู', 'ููู', 'ูุฏ', 'ุญูู ', 'ูุงู', | |
'ูุงู', 'ูุงูุช', 'ูููู', 'ุชููู', 'ูู', 'ููุง', 'ููู ', 'ู', 'ุฃู ', 'ุฅู', | |
'ุฑุถู', 'ุนูููุง', 'ุนููู ', 'ุนูู', 'ุนูููู ', 'ุตูู', 'ูุณูู ', | |
'ุณูุงู ', 'ุนููู', 'ุงูุฑุณูู', 'ุงููุจู', 'ุนููู', 'ุงูุณูุงู ', 'ุญุฏูุซ', 'ุงุญุงุฏูุซ' | |
} | |
def arabic_word_tokenize(text): | |
if not isinstance(text, str): | |
return [] | |
# Remove diacritics | |
text = re.sub(r'[\u064B-\u065F\u0670]', '', text) | |
# Extract only Arabic words (length โฅ 2) | |
tokens = re.findall(r'[\u0600-\u06FF]{2,}', text) | |
return [t for t in tokens if t not in ARABIC_STOPWORDS] | |
def prepare_bm25_corpus(questions): | |
"""Prepare tokenized corpus for BM25""" | |
tokenized_corpus = [] | |
for question in questions: | |
tokens = arabic_word_tokenize(question) | |
tokenized_corpus.append(tokens) | |
return tokenized_corpus | |
# Initialize BM25 models for each dataset | |
print("Initializing BM25 models...") | |
bm25_corpus1 = prepare_bm25_corpus(df_questions) | |
bm25_corpus2 = prepare_bm25_corpus(df2_questions) | |
bm25_corpus3 = prepare_bm25_corpus(df3_questions) | |
bm25_model1 = BM25Okapi(bm25_corpus1) | |
bm25_model2 = BM25Okapi(bm25_corpus2) | |
bm25_model3 = BM25Okapi(bm25_corpus3) | |
print("BM25 models initialized!") | |
corpus_length1 = len(df_questions) | |
corpus_length2 = len(df2_questions) | |
corpus_length3 = len(df3_questions) | |
def compute_bm25_scores(query, bm25_model,corpus_length): | |
"""Compute BM25 scores for a query""" | |
query_tokens = arabic_word_tokenize(query) | |
if not query_tokens: | |
return np.zeros(corpus_length) | |
scores = bm25_model.get_scores(query_tokens) | |
return scores | |
def compute_word_overlap(query, questions): | |
"""Enhanced word overlap computation""" | |
query_words = set(arabic_word_tokenize(query)) | |
if len(query_words) == 0: | |
return [0.0] * len(questions) | |
overlaps = [] | |
for q in questions: | |
q_words = set(arabic_word_tokenize(q)) | |
if len(q_words) == 0: | |
overlaps.append(0.0) | |
continue | |
# Use Jaccard similarity (intersection over union) | |
intersection = len(query_words & q_words) | |
union = len(query_words | q_words) | |
jaccard = intersection / union if union > 0 else 0.0 | |
# Also compute coverage (how much of query is matched) | |
coverage = intersection / len(query_words) | |
# Combine both: prioritize coverage but consider similarity | |
overlap_score = 0.7 * coverage + 0.3 * jaccard | |
overlaps.append(overlap_score) | |
return overlaps | |
def normalize_scores(scores): | |
"""Normalize scores to 0-1 range""" | |
scores = np.array(scores) | |
if np.max(scores) == np.min(scores): | |
return np.zeros_like(scores) | |
return (scores - np.min(scores)) / (np.max(scores) - np.min(scores)) | |
def predict(text): | |
print(f"Received query: {text}") | |
if not text or text.strip() == "": | |
return "No query provided" | |
# Semantic similarity scores | |
query_embedding = model.encode(text, convert_to_tensor=True) | |
query_embeddinga = modela.encode(text, convert_to_tensor=True) | |
# Cosine similarities (averaged from two models) | |
sim_scores1 = (util.pytorch_cos_sim(query_embedding, embeddings)[0] + | |
util.pytorch_cos_sim(query_embeddinga, embeddingsa)[0]) / 2 | |
sim_scores2 = (util.pytorch_cos_sim(query_embedding, embeddings2)[0] + | |
util.pytorch_cos_sim(query_embeddinga, embeddingsa2)[0]) / 2 | |
sim_scores3 = (util.pytorch_cos_sim(query_embedding, embeddings3)[0] + | |
util.pytorch_cos_sim(query_embeddinga, embeddingsa3)[0]) / 2 | |
# BM25 scores | |
bm25_scores1 = compute_bm25_scores(text, bm25_model1,corpus_length1) | |
bm25_scores2 = compute_bm25_scores(text, bm25_model2,corpus_length2) | |
bm25_scores3 = compute_bm25_scores(text, bm25_model3,corpus_length3) | |
# Word overlap scores | |
word_overlap1 = compute_word_overlap(text, df_questions) | |
word_overlap2 = compute_word_overlap(text, df2_questions) | |
word_overlap3 = compute_word_overlap(text, df3_questions) | |
# Normalize all scores for fair combination | |
norm_sim1 = normalize_scores(sim_scores1.cpu().numpy()) | |
norm_sim2 = normalize_scores(sim_scores2.cpu().numpy()) | |
norm_sim3 = normalize_scores(sim_scores3.cpu().numpy()) | |
norm_bm25_1 = normalize_scores(bm25_scores1) | |
norm_bm25_2 = normalize_scores(bm25_scores2) | |
norm_bm25_3 = normalize_scores(bm25_scores3) | |
norm_word1 = normalize_scores(word_overlap1) | |
norm_word2 = normalize_scores(word_overlap2) | |
norm_word3 = normalize_scores(word_overlap3) | |
# Adaptive weighting based on query characteristics | |
query_words = arabic_word_tokenize(text) | |
query_length = len(query_words) | |
if query_length <= 4: | |
# Short queries: prioritize exact matches (BM25 + word overlap) | |
semantic_weight = 0.3 | |
bm25_weight = 0.4 | |
word_weight = 0.3 | |
elif query_length <= 6: | |
# Medium queries: balanced approach | |
semantic_weight = 0.4 | |
bm25_weight = 0.35 | |
word_weight = 0.25 | |
else: | |
# Long queries: prioritize semantic understanding | |
semantic_weight = 0.5 | |
bm25_weight = 0.3 | |
word_weight = 0.2 | |
def create_combined_results(questions, links, norm_semantic, norm_bm25, norm_word): | |
combined_results = [] | |
for i in range(len(questions)): | |
semantic_score = float(norm_semantic[i]) | |
bm25_score = float(norm_bm25[i]) | |
word_score = float(norm_word[i]) | |
# Enhanced scoring with BM25 | |
combined_score = (semantic_weight * semantic_score + | |
bm25_weight * bm25_score + | |
word_weight * word_score) | |
# Boost results that perform well across multiple metrics | |
high_performance_count = sum([ | |
semantic_score > 0.7, | |
bm25_score > 0.7, | |
word_score > 0.5 | |
]) | |
if high_performance_count >= 2: | |
boost = 0.1 | |
elif high_performance_count >= 1: | |
boost = 0.05 | |
else: | |
boost = 0.0 | |
final_score = combined_score + boost | |
combined_results.append({ | |
"question": questions[i], | |
"link": links[i], | |
"semantic_score": semantic_score, | |
"bm25_score": bm25_score, | |
"word_overlap_score": word_score, | |
"combined_score": final_score | |
}) | |
return combined_results | |
# Create combined results for all datasets | |
combined1 = create_combined_results(df_questions, df_links, norm_sim1, norm_bm25_1, norm_word1) | |
combined2 = create_combined_results(df2_questions, df2_links, norm_sim2, norm_bm25_2, norm_word2) | |
combined3 = create_combined_results(df3_questions, df3_links, norm_sim3, norm_bm25_3, norm_word3) | |
# def get_diverse_top_results(combined_results, top_k=5): | |
# """Get diverse top results using multiple ranking strategies""" | |
# # Sort by combined score and get top candidates | |
# by_combined = sorted(combined_results, key=lambda x: x["combined_score"], reverse=True) | |
# top_combined = by_combined[:3] | |
# # Get questions from top combined to avoid duplicates | |
# used_questions = {item["question"] for item in top_combined} | |
# # Add best BM25 result not already included | |
# by_bm25 = sorted(combined_results, key=lambda x: x["bm25_score"], reverse=True) | |
# bm25_pick = None | |
# for item in by_bm25: | |
# if item["question"] not in used_questions: | |
# bm25_pick = item | |
# break | |
# # Add best semantic result not already included | |
# by_semantic = sorted(combined_results, key=lambda x: x["semantic_score"], reverse=True) | |
# semantic_pick = None | |
# if bm25_pick: | |
# used_questions.add(bm25_pick["question"]) | |
# for item in by_semantic: | |
# if item["question"] not in used_questions: | |
# semantic_pick = item | |
# break | |
# # Combine results | |
# final_results = top_combined.copy() | |
# if bm25_pick: | |
# final_results.append(bm25_pick) | |
# if semantic_pick: | |
# final_results.append(semantic_pick) | |
# return final_results[:top_k] | |
def get_diverse_top_results(combined_results, top_k=15): | |
"""Get diverse top results using multiple ranking strategies with BM25 threshold""" | |
# First, check if any results have BM25 score > 0.1 | |
has_good_bm25 = any(item["bm25_score"] > 0.1 for item in combined_results) | |
if has_good_bm25: | |
# Filter results to only include those with BM25 > 0.1 | |
filtered_results = [item for item in combined_results if item["bm25_score"] > 0.1] | |
else: | |
# If all BM25 scores are <= 0.1, use all results | |
filtered_results = combined_results | |
# Sort by combined score and get top candidates from filtered results | |
by_combined = sorted(filtered_results, key=lambda x: x["combined_score"], reverse=True) | |
top_combined = by_combined[:top_k-5] | |
# Get questions from top combined to avoid duplicates | |
used_questions = {item["question"] for item in top_combined} | |
# Add best BM25 result not already included (from filtered results) | |
by_bm25 = sorted(filtered_results, key=lambda x: x["bm25_score"], reverse=True) | |
bm25_pick = None | |
for item in by_bm25: | |
if item["question"] not in used_questions: | |
bm25_pick = item | |
break | |
# Add best semantic result not already included (from filtered results) | |
by_semantic = sorted(filtered_results, key=lambda x: x["semantic_score"], reverse=True) | |
semantic_pick = None | |
if bm25_pick: | |
used_questions.add(bm25_pick["question"]) | |
for item in by_semantic: | |
if item["question"] not in used_questions: | |
semantic_pick = item | |
break | |
# Combine results | |
final_results = top_combined.copy() | |
if bm25_pick: | |
final_results.append(bm25_pick) | |
if semantic_pick: | |
final_results.append(semantic_pick) | |
return final_results[:top_k] | |
# Get top results for each dataset | |
top1 = get_diverse_top_results(combined1) | |
top2 = get_diverse_top_results(combined2) | |
top3 = get_diverse_top_results(combined3) | |
results = { | |
"top2": top2, | |
"top3": top3, | |
"top1": top1, | |
"query_info": { | |
"query_length": query_length, | |
"weights": { | |
"semantic": semantic_weight, | |
"bm25": bm25_weight, | |
"word_overlap": word_weight | |
} | |
} | |
} | |
return results | |
title = "Enhanced Search with BM25" | |
iface = gr.Interface( | |
fn=predict, | |
inputs=[gr.Textbox(label="Search Query", lines=3)], | |
outputs='json', | |
title=title, | |
description="Arabic text search using combined semantic similarity, BM25, and word overlap scoring" | |
) | |
if __name__ == "__main__": | |
iface.launch() | |