import torch import pandas as pd from sentence_transformers import SentenceTransformer, util import gradio as gr import re from rank_bm25 import BM25Okapi import numpy as np # Load models model = SentenceTransformer("distilbert-base-multilingual-cased") modela = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") # Load data df = pd.read_csv("cleaned1.csv") df2 = pd.read_csv("cleaned2.csv") df3 = pd.read_csv("cleaned3.csv") # Load pre-computed embeddings embeddings = torch.load("embeddings1_1.pt") embeddings2 = torch.load("embeddings2_1.pt") embeddings3 = torch.load("embeddings3_1.pt") embeddingsa = torch.load("embeddings1.pt") embeddingsa2 = torch.load("embeddings2.pt") embeddingsa3 = torch.load("embeddings3.pt") # Extract questions and links df_questions = df["question"].values df_links = df["link"].values df2_questions = df2["question"].values df2_links = df2["link"].values df3_questions = df3["question"].values df3_links = df3["url"].values # ARABIC_STOPWORDS = { # 'في', 'من', 'إلى', 'عن', 'مع', 'هذا', 'هذه', 'ذلك', 'تلك', # 'التي', 'الذي', 'ما', 'لا', 'أن', 'أو', 'لكن', 'قد', 'حكم', 'قال', # 'كان', 'كانت', 'يكون', 'تكون', 'له', 'لها', 'لهم', 'و', 'أم', 'إن' # } ARABIC_STOPWORDS = { 'في', 'من', 'إلى', 'عن', 'مع', 'هذا', 'هذه', 'ذلك', 'تلك', 'التي', 'الذي', 'ما', 'لا', 'أن', 'أو', 'لكن', 'قد', 'حكم', 'قال', 'كان', 'كانت', 'يكون', 'تكون', 'له', 'لها', 'لهم', 'و', 'أم', 'إن', 'رضي', 'عليها', 'عنهم', 'عنه', 'عليهم', 'صلى', 'وسلم', 'سلام', 'عليه', 'الرسول', 'النبي', 'عليه', 'السلام', 'حديث', 'احاديث' } def arabic_word_tokenize(text): if not isinstance(text, str): return [] # Remove diacritics text = re.sub(r'[\u064B-\u065F\u0670]', '', text) # Extract only Arabic words (length ≥ 2) tokens = re.findall(r'[\u0600-\u06FF]{2,}', text) return [t for t in tokens if t not in ARABIC_STOPWORDS] def prepare_bm25_corpus(questions): """Prepare tokenized corpus for BM25""" tokenized_corpus = [] for question in questions: tokens = arabic_word_tokenize(question) tokenized_corpus.append(tokens) return tokenized_corpus # Initialize BM25 models for each dataset print("Initializing BM25 models...") bm25_corpus1 = prepare_bm25_corpus(df_questions) bm25_corpus2 = prepare_bm25_corpus(df2_questions) bm25_corpus3 = prepare_bm25_corpus(df3_questions) bm25_model1 = BM25Okapi(bm25_corpus1) bm25_model2 = BM25Okapi(bm25_corpus2) bm25_model3 = BM25Okapi(bm25_corpus3) print("BM25 models initialized!") corpus_length1 = len(df_questions) corpus_length2 = len(df2_questions) corpus_length3 = len(df3_questions) def compute_bm25_scores(query, bm25_model,corpus_length): """Compute BM25 scores for a query""" query_tokens = arabic_word_tokenize(query) if not query_tokens: return np.zeros(corpus_length) scores = bm25_model.get_scores(query_tokens) return scores def compute_word_overlap(query, questions): """Enhanced word overlap computation""" query_words = set(arabic_word_tokenize(query)) if len(query_words) == 0: return [0.0] * len(questions) overlaps = [] for q in questions: q_words = set(arabic_word_tokenize(q)) if len(q_words) == 0: overlaps.append(0.0) continue # Use Jaccard similarity (intersection over union) intersection = len(query_words & q_words) union = len(query_words | q_words) jaccard = intersection / union if union > 0 else 0.0 # Also compute coverage (how much of query is matched) coverage = intersection / len(query_words) # Combine both: prioritize coverage but consider similarity overlap_score = 0.7 * coverage + 0.3 * jaccard overlaps.append(overlap_score) return overlaps def normalize_scores(scores): """Normalize scores to 0-1 range""" scores = np.array(scores) if np.max(scores) == np.min(scores): return np.zeros_like(scores) return (scores - np.min(scores)) / (np.max(scores) - np.min(scores)) def predict(text): print(f"Received query: {text}") if not text or text.strip() == "": return "No query provided" # Semantic similarity scores query_embedding = model.encode(text, convert_to_tensor=True) query_embeddinga = modela.encode(text, convert_to_tensor=True) # Cosine similarities (averaged from two models) sim_scores1 = (util.pytorch_cos_sim(query_embedding, embeddings)[0] + util.pytorch_cos_sim(query_embeddinga, embeddingsa)[0]) / 2 sim_scores2 = (util.pytorch_cos_sim(query_embedding, embeddings2)[0] + util.pytorch_cos_sim(query_embeddinga, embeddingsa2)[0]) / 2 sim_scores3 = (util.pytorch_cos_sim(query_embedding, embeddings3)[0] + util.pytorch_cos_sim(query_embeddinga, embeddingsa3)[0]) / 2 # BM25 scores bm25_scores1 = compute_bm25_scores(text, bm25_model1,corpus_length1) bm25_scores2 = compute_bm25_scores(text, bm25_model2,corpus_length2) bm25_scores3 = compute_bm25_scores(text, bm25_model3,corpus_length3) # Word overlap scores word_overlap1 = compute_word_overlap(text, df_questions) word_overlap2 = compute_word_overlap(text, df2_questions) word_overlap3 = compute_word_overlap(text, df3_questions) # Normalize all scores for fair combination norm_sim1 = normalize_scores(sim_scores1.cpu().numpy()) norm_sim2 = normalize_scores(sim_scores2.cpu().numpy()) norm_sim3 = normalize_scores(sim_scores3.cpu().numpy()) norm_bm25_1 = normalize_scores(bm25_scores1) norm_bm25_2 = normalize_scores(bm25_scores2) norm_bm25_3 = normalize_scores(bm25_scores3) norm_word1 = normalize_scores(word_overlap1) norm_word2 = normalize_scores(word_overlap2) norm_word3 = normalize_scores(word_overlap3) # Adaptive weighting based on query characteristics query_words = arabic_word_tokenize(text) query_length = len(query_words) if query_length <= 4: # Short queries: prioritize exact matches (BM25 + word overlap) semantic_weight = 0.3 bm25_weight = 0.4 word_weight = 0.3 elif query_length <= 6: # Medium queries: balanced approach semantic_weight = 0.4 bm25_weight = 0.35 word_weight = 0.25 else: # Long queries: prioritize semantic understanding semantic_weight = 0.5 bm25_weight = 0.3 word_weight = 0.2 def create_combined_results(questions, links, norm_semantic, norm_bm25, norm_word): combined_results = [] for i in range(len(questions)): semantic_score = float(norm_semantic[i]) bm25_score = float(norm_bm25[i]) word_score = float(norm_word[i]) # Enhanced scoring with BM25 combined_score = (semantic_weight * semantic_score + bm25_weight * bm25_score + word_weight * word_score) # Boost results that perform well across multiple metrics high_performance_count = sum([ semantic_score > 0.7, bm25_score > 0.7, word_score > 0.5 ]) if high_performance_count >= 2: boost = 0.1 elif high_performance_count >= 1: boost = 0.05 else: boost = 0.0 final_score = combined_score + boost combined_results.append({ "question": questions[i], "link": links[i], "semantic_score": semantic_score, "bm25_score": bm25_score, "word_overlap_score": word_score, "combined_score": final_score }) return combined_results # Create combined results for all datasets combined1 = create_combined_results(df_questions, df_links, norm_sim1, norm_bm25_1, norm_word1) combined2 = create_combined_results(df2_questions, df2_links, norm_sim2, norm_bm25_2, norm_word2) combined3 = create_combined_results(df3_questions, df3_links, norm_sim3, norm_bm25_3, norm_word3) # def get_diverse_top_results(combined_results, top_k=5): # """Get diverse top results using multiple ranking strategies""" # # Sort by combined score and get top candidates # by_combined = sorted(combined_results, key=lambda x: x["combined_score"], reverse=True) # top_combined = by_combined[:3] # # Get questions from top combined to avoid duplicates # used_questions = {item["question"] for item in top_combined} # # Add best BM25 result not already included # by_bm25 = sorted(combined_results, key=lambda x: x["bm25_score"], reverse=True) # bm25_pick = None # for item in by_bm25: # if item["question"] not in used_questions: # bm25_pick = item # break # # Add best semantic result not already included # by_semantic = sorted(combined_results, key=lambda x: x["semantic_score"], reverse=True) # semantic_pick = None # if bm25_pick: # used_questions.add(bm25_pick["question"]) # for item in by_semantic: # if item["question"] not in used_questions: # semantic_pick = item # break # # Combine results # final_results = top_combined.copy() # if bm25_pick: # final_results.append(bm25_pick) # if semantic_pick: # final_results.append(semantic_pick) # return final_results[:top_k] def get_diverse_top_results(combined_results, top_k=15): """Get diverse top results using multiple ranking strategies with BM25 threshold""" # First, check if any results have BM25 score > 0.1 has_good_bm25 = any(item["bm25_score"] > 0.1 for item in combined_results) if has_good_bm25: # Filter results to only include those with BM25 > 0.1 filtered_results = [item for item in combined_results if item["bm25_score"] > 0.1] else: # If all BM25 scores are <= 0.1, use all results filtered_results = combined_results # Sort by combined score and get top candidates from filtered results by_combined = sorted(filtered_results, key=lambda x: x["combined_score"], reverse=True) top_combined = by_combined[:top_k-5] # Get questions from top combined to avoid duplicates used_questions = {item["question"] for item in top_combined} # Add best BM25 result not already included (from filtered results) by_bm25 = sorted(filtered_results, key=lambda x: x["bm25_score"], reverse=True) bm25_pick = None for item in by_bm25: if item["question"] not in used_questions: bm25_pick = item break # Add best semantic result not already included (from filtered results) by_semantic = sorted(filtered_results, key=lambda x: x["semantic_score"], reverse=True) semantic_pick = None if bm25_pick: used_questions.add(bm25_pick["question"]) for item in by_semantic: if item["question"] not in used_questions: semantic_pick = item break # Combine results final_results = top_combined.copy() if bm25_pick: final_results.append(bm25_pick) if semantic_pick: final_results.append(semantic_pick) return final_results[:top_k] # Get top results for each dataset top1 = get_diverse_top_results(combined1) top2 = get_diverse_top_results(combined2) top3 = get_diverse_top_results(combined3) results = { "top2": top2, "top3": top3, "top1": top1, "query_info": { "query_length": query_length, "weights": { "semantic": semantic_weight, "bm25": bm25_weight, "word_overlap": word_weight } } } return results title = "Enhanced Search with BM25" iface = gr.Interface( fn=predict, inputs=[gr.Textbox(label="Search Query", lines=3)], outputs='json', title=title, description="Arabic text search using combined semantic similarity, BM25, and word overlap scoring" ) if __name__ == "__main__": iface.launch()