searchcsv2 / app.py
mohbay's picture
Update app.py
2bc77eb verified
import torch
import pandas as pd
from sentence_transformers import SentenceTransformer, util
import gradio as gr
import re
from rank_bm25 import BM25Okapi
import numpy as np
# Load models
model = SentenceTransformer("distilbert-base-multilingual-cased")
modela = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2")
# Load data
df = pd.read_csv("cleaned1.csv")
df2 = pd.read_csv("cleaned2.csv")
df3 = pd.read_csv("cleaned3.csv")
# Load pre-computed embeddings
embeddings = torch.load("embeddings1_1.pt")
embeddings2 = torch.load("embeddings2_1.pt")
embeddings3 = torch.load("embeddings3_1.pt")
embeddingsa = torch.load("embeddings1.pt")
embeddingsa2 = torch.load("embeddings2.pt")
embeddingsa3 = torch.load("embeddings3.pt")
# Extract questions and links
df_questions = df["question"].values
df_links = df["link"].values
df2_questions = df2["question"].values
df2_links = df2["link"].values
df3_questions = df3["question"].values
df3_links = df3["url"].values
# ARABIC_STOPWORDS = {
# 'ููŠ', 'ู…ู†', 'ุฅู„ู‰', 'ุนู†', 'ู…ุน', 'ู‡ุฐุง', 'ู‡ุฐู‡', 'ุฐู„ูƒ', 'ุชู„ูƒ',
# 'ุงู„ุชูŠ', 'ุงู„ุฐูŠ', 'ู…ุง', 'ู„ุง', 'ุฃู†', 'ุฃูˆ', 'ู„ูƒู†', 'ู‚ุฏ', 'ุญูƒู…', 'ู‚ุงู„',
# 'ูƒุงู†', 'ูƒุงู†ุช', 'ูŠูƒูˆู†', 'ุชูƒูˆู†', 'ู„ู‡', 'ู„ู‡ุง', 'ู„ู‡ู…', 'ูˆ', 'ุฃู…', 'ุฅู†'
# }
ARABIC_STOPWORDS = {
'ููŠ', 'ู…ู†', 'ุฅู„ู‰', 'ุนู†', 'ู…ุน', 'ู‡ุฐุง', 'ู‡ุฐู‡', 'ุฐู„ูƒ', 'ุชู„ูƒ',
'ุงู„ุชูŠ', 'ุงู„ุฐูŠ', 'ู…ุง', 'ู„ุง', 'ุฃู†', 'ุฃูˆ', 'ู„ูƒู†', 'ู‚ุฏ', 'ุญูƒู…', 'ู‚ุงู„',
'ูƒุงู†', 'ูƒุงู†ุช', 'ูŠูƒูˆู†', 'ุชูƒูˆู†', 'ู„ู‡', 'ู„ู‡ุง', 'ู„ู‡ู…', 'ูˆ', 'ุฃู…', 'ุฅู†',
'ุฑุถูŠ', 'ุนู„ูŠู‡ุง', 'ุนู†ู‡ู…', 'ุนู†ู‡', 'ุนู„ูŠู‡ู…', 'ุตู„ู‰', 'ูˆุณู„ู…',
'ุณู„ุงู…', 'ุนู„ูŠู‡', 'ุงู„ุฑุณูˆู„', 'ุงู„ู†ุจูŠ', 'ุนู„ูŠู‡', 'ุงู„ุณู„ุงู…', 'ุญุฏูŠุซ', 'ุงุญุงุฏูŠุซ'
}
def arabic_word_tokenize(text):
if not isinstance(text, str):
return []
# Remove diacritics
text = re.sub(r'[\u064B-\u065F\u0670]', '', text)
# Extract only Arabic words (length โ‰ฅ 2)
tokens = re.findall(r'[\u0600-\u06FF]{2,}', text)
return [t for t in tokens if t not in ARABIC_STOPWORDS]
def prepare_bm25_corpus(questions):
"""Prepare tokenized corpus for BM25"""
tokenized_corpus = []
for question in questions:
tokens = arabic_word_tokenize(question)
tokenized_corpus.append(tokens)
return tokenized_corpus
# Initialize BM25 models for each dataset
print("Initializing BM25 models...")
bm25_corpus1 = prepare_bm25_corpus(df_questions)
bm25_corpus2 = prepare_bm25_corpus(df2_questions)
bm25_corpus3 = prepare_bm25_corpus(df3_questions)
bm25_model1 = BM25Okapi(bm25_corpus1)
bm25_model2 = BM25Okapi(bm25_corpus2)
bm25_model3 = BM25Okapi(bm25_corpus3)
print("BM25 models initialized!")
corpus_length1 = len(df_questions)
corpus_length2 = len(df2_questions)
corpus_length3 = len(df3_questions)
def compute_bm25_scores(query, bm25_model,corpus_length):
"""Compute BM25 scores for a query"""
query_tokens = arabic_word_tokenize(query)
if not query_tokens:
return np.zeros(corpus_length)
scores = bm25_model.get_scores(query_tokens)
return scores
def compute_word_overlap(query, questions):
"""Enhanced word overlap computation"""
query_words = set(arabic_word_tokenize(query))
if len(query_words) == 0:
return [0.0] * len(questions)
overlaps = []
for q in questions:
q_words = set(arabic_word_tokenize(q))
if len(q_words) == 0:
overlaps.append(0.0)
continue
# Use Jaccard similarity (intersection over union)
intersection = len(query_words & q_words)
union = len(query_words | q_words)
jaccard = intersection / union if union > 0 else 0.0
# Also compute coverage (how much of query is matched)
coverage = intersection / len(query_words)
# Combine both: prioritize coverage but consider similarity
overlap_score = 0.7 * coverage + 0.3 * jaccard
overlaps.append(overlap_score)
return overlaps
def normalize_scores(scores):
"""Normalize scores to 0-1 range"""
scores = np.array(scores)
if np.max(scores) == np.min(scores):
return np.zeros_like(scores)
return (scores - np.min(scores)) / (np.max(scores) - np.min(scores))
def predict(text):
print(f"Received query: {text}")
if not text or text.strip() == "":
return "No query provided"
# Semantic similarity scores
query_embedding = model.encode(text, convert_to_tensor=True)
query_embeddinga = modela.encode(text, convert_to_tensor=True)
# Cosine similarities (averaged from two models)
sim_scores1 = (util.pytorch_cos_sim(query_embedding, embeddings)[0] +
util.pytorch_cos_sim(query_embeddinga, embeddingsa)[0]) / 2
sim_scores2 = (util.pytorch_cos_sim(query_embedding, embeddings2)[0] +
util.pytorch_cos_sim(query_embeddinga, embeddingsa2)[0]) / 2
sim_scores3 = (util.pytorch_cos_sim(query_embedding, embeddings3)[0] +
util.pytorch_cos_sim(query_embeddinga, embeddingsa3)[0]) / 2
# BM25 scores
bm25_scores1 = compute_bm25_scores(text, bm25_model1,corpus_length1)
bm25_scores2 = compute_bm25_scores(text, bm25_model2,corpus_length2)
bm25_scores3 = compute_bm25_scores(text, bm25_model3,corpus_length3)
# Word overlap scores
word_overlap1 = compute_word_overlap(text, df_questions)
word_overlap2 = compute_word_overlap(text, df2_questions)
word_overlap3 = compute_word_overlap(text, df3_questions)
# Normalize all scores for fair combination
norm_sim1 = normalize_scores(sim_scores1.cpu().numpy())
norm_sim2 = normalize_scores(sim_scores2.cpu().numpy())
norm_sim3 = normalize_scores(sim_scores3.cpu().numpy())
norm_bm25_1 = normalize_scores(bm25_scores1)
norm_bm25_2 = normalize_scores(bm25_scores2)
norm_bm25_3 = normalize_scores(bm25_scores3)
norm_word1 = normalize_scores(word_overlap1)
norm_word2 = normalize_scores(word_overlap2)
norm_word3 = normalize_scores(word_overlap3)
# Adaptive weighting based on query characteristics
query_words = arabic_word_tokenize(text)
query_length = len(query_words)
if query_length <= 4:
# Short queries: prioritize exact matches (BM25 + word overlap)
semantic_weight = 0.3
bm25_weight = 0.4
word_weight = 0.3
elif query_length <= 6:
# Medium queries: balanced approach
semantic_weight = 0.4
bm25_weight = 0.35
word_weight = 0.25
else:
# Long queries: prioritize semantic understanding
semantic_weight = 0.5
bm25_weight = 0.3
word_weight = 0.2
def create_combined_results(questions, links, norm_semantic, norm_bm25, norm_word):
combined_results = []
for i in range(len(questions)):
semantic_score = float(norm_semantic[i])
bm25_score = float(norm_bm25[i])
word_score = float(norm_word[i])
# Enhanced scoring with BM25
combined_score = (semantic_weight * semantic_score +
bm25_weight * bm25_score +
word_weight * word_score)
# Boost results that perform well across multiple metrics
high_performance_count = sum([
semantic_score > 0.7,
bm25_score > 0.7,
word_score > 0.5
])
if high_performance_count >= 2:
boost = 0.1
elif high_performance_count >= 1:
boost = 0.05
else:
boost = 0.0
final_score = combined_score + boost
combined_results.append({
"question": questions[i],
"link": links[i],
"semantic_score": semantic_score,
"bm25_score": bm25_score,
"word_overlap_score": word_score,
"combined_score": final_score
})
return combined_results
# Create combined results for all datasets
combined1 = create_combined_results(df_questions, df_links, norm_sim1, norm_bm25_1, norm_word1)
combined2 = create_combined_results(df2_questions, df2_links, norm_sim2, norm_bm25_2, norm_word2)
combined3 = create_combined_results(df3_questions, df3_links, norm_sim3, norm_bm25_3, norm_word3)
# def get_diverse_top_results(combined_results, top_k=5):
# """Get diverse top results using multiple ranking strategies"""
# # Sort by combined score and get top candidates
# by_combined = sorted(combined_results, key=lambda x: x["combined_score"], reverse=True)
# top_combined = by_combined[:3]
# # Get questions from top combined to avoid duplicates
# used_questions = {item["question"] for item in top_combined}
# # Add best BM25 result not already included
# by_bm25 = sorted(combined_results, key=lambda x: x["bm25_score"], reverse=True)
# bm25_pick = None
# for item in by_bm25:
# if item["question"] not in used_questions:
# bm25_pick = item
# break
# # Add best semantic result not already included
# by_semantic = sorted(combined_results, key=lambda x: x["semantic_score"], reverse=True)
# semantic_pick = None
# if bm25_pick:
# used_questions.add(bm25_pick["question"])
# for item in by_semantic:
# if item["question"] not in used_questions:
# semantic_pick = item
# break
# # Combine results
# final_results = top_combined.copy()
# if bm25_pick:
# final_results.append(bm25_pick)
# if semantic_pick:
# final_results.append(semantic_pick)
# return final_results[:top_k]
def get_diverse_top_results(combined_results, top_k=15):
"""Get diverse top results using multiple ranking strategies with BM25 threshold"""
# First, check if any results have BM25 score > 0.1
has_good_bm25 = any(item["bm25_score"] > 0.1 for item in combined_results)
if has_good_bm25:
# Filter results to only include those with BM25 > 0.1
filtered_results = [item for item in combined_results if item["bm25_score"] > 0.1]
else:
# If all BM25 scores are <= 0.1, use all results
filtered_results = combined_results
# Sort by combined score and get top candidates from filtered results
by_combined = sorted(filtered_results, key=lambda x: x["combined_score"], reverse=True)
top_combined = by_combined[:top_k-5]
# Get questions from top combined to avoid duplicates
used_questions = {item["question"] for item in top_combined}
# Add best BM25 result not already included (from filtered results)
by_bm25 = sorted(filtered_results, key=lambda x: x["bm25_score"], reverse=True)
bm25_pick = None
for item in by_bm25:
if item["question"] not in used_questions:
bm25_pick = item
break
# Add best semantic result not already included (from filtered results)
by_semantic = sorted(filtered_results, key=lambda x: x["semantic_score"], reverse=True)
semantic_pick = None
if bm25_pick:
used_questions.add(bm25_pick["question"])
for item in by_semantic:
if item["question"] not in used_questions:
semantic_pick = item
break
# Combine results
final_results = top_combined.copy()
if bm25_pick:
final_results.append(bm25_pick)
if semantic_pick:
final_results.append(semantic_pick)
return final_results[:top_k]
# Get top results for each dataset
top1 = get_diverse_top_results(combined1)
top2 = get_diverse_top_results(combined2)
top3 = get_diverse_top_results(combined3)
results = {
"top2": top2,
"top3": top3,
"top1": top1,
"query_info": {
"query_length": query_length,
"weights": {
"semantic": semantic_weight,
"bm25": bm25_weight,
"word_overlap": word_weight
}
}
}
return results
title = "Enhanced Search with BM25"
iface = gr.Interface(
fn=predict,
inputs=[gr.Textbox(label="Search Query", lines=3)],
outputs='json',
title=title,
description="Arabic text search using combined semantic similarity, BM25, and word overlap scoring"
)
if __name__ == "__main__":
iface.launch()