import gradio as gr import joblib import nltk from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer import re import os import pandas as pd from collections import Counter import io # Download NLTK data try: nltk.download('stopwords', quiet=True) nltk.download('wordnet', quiet=True) nltk.download('punkt', quiet=True) nltk.download('omw-1.4', quiet=True) except: pass # Preprocessor with multi-language support class AdvancedTextPreprocessor: def __init__(self, use_lemmatization=True, languages=['english']): self.use_lemmatization = use_lemmatization self.stop_words = set() for lang in languages: try: self.stop_words.update(set(stopwords.words(lang))) except: pass self.lemmatizer = WordNetLemmatizer() def clean_text(self, text): text = str(text).lower() text = re.sub(r'http\S+|www\S+|https\S+', '', text) text = re.sub(r'\S+@\S+', '', text) text = re.sub(r'\d+', '', text) text = re.sub(r'[^a-zA-Z\s]', '', text) return ' '.join(text.split()) def remove_stopwords(self, text): words = text.split() filtered = [w for w in words if w not in self.stop_words] return ' '.join(filtered) def lemmatize_text(self, text): try: return ' '.join([self.lemmatizer.lemmatize(w) for w in text.split()]) except: return text def preprocess(self, text): text = self.clean_text(text) text = self.remove_stopwords(text) if self.use_lemmatization: text = self.lemmatize_text(text) return text preprocessor = AdvancedTextPreprocessor(languages=['english']) # Load model and vectorizer model_path = "spam_classifier.joblib" vectorizer_path = "tfidf_vectorizer.joblib" model = joblib.load(model_path) vectorizer = joblib.load(vectorizer_path) # Spam indicators SPAM_KEYWORDS = ['win', 'winner', 'congratulations', 'free', 'urgent', 'click', 'verify', 'account', 'suspended', 'prize', 'lottery', 'cash', 'credit', 'loan', 'limited time', 'act now', 'expire', 'claim', 'bonus'] # Credential phishing keywords CREDENTIAL_KEYWORDS = ['password', 'username', 'login', 'credential', 'signin', 'sign in', 'verify account', 'confirm identity', 'update payment', 'billing information', 'security alert', 'unusual activity', 'locked account', 'reset password'] def simple_language_detection(text): """Simple language detection based on character patterns""" # Count character types text_lower = text.lower() # Common patterns for different languages patterns = { 'English': re.findall(r'\b(?:the|and|for|are|but|not|you|all|have|her|was|one|our|out|if|will|can|what|when|your|said|there|each|which|their|time|with|about|many|then|them|these|some|would|make|like|him|into|has|look|more|write|see|other|after|than|call|first|may|way|who|its|now|people|been|had|how|did|get|made|find|where|much|too|very|still|being|going)\b', text_lower), 'Spanish': re.findall(r'\b(?:el|la|de|que|y|en|un|por|con|para|es|los|se|las|del|al|más|pero|su|le|ya|este|todo|esta|son|cuando|muy|sin|sobre|también|hay|donde|quien|desde|todos|parte|tiene|esto|ese|cada|hasta|vida|otros|aunque|esa|eso|hace|otra|gobierno|tan|durante|siempre|día|tanto|ella|tres|sí|dijo|sido|gran|país|según|menos|mundo|año|antes|estado|está|hombre|estar|caso|nada|hacer|años|tiempo|hoy|mayor|ahora|momento|mucho|después|entre|gente|sistema|ser|ciudad|manera|forma|dar|donde)\b', text_lower), 'French': re.findall(r'\b(?:le|de|un|être|et|à|il|avoir|ne|je|son|que|se|qui|ce|dans|elle|au|pour|pas|sur|on|avec|tout|plus|leur|était|par|sans|tu|ou|bien|dit|elle|si|comme|mais|peut|nous|aussi|autre|dont|où|encore|maintenant|deux|même|déjà|avant|ici|peu|alors|sous|homme|notre|très|même|quand|notre|sans|pourquoi|tout|après|jamais|aussi|toujours|puis|jamais|rien|cela|jour)\b', text_lower), 'German': re.findall(r'\b(?:der|die|und|in|den|von|zu|das|mit|sich|des|auf|für|ist|im|dem|nicht|ein|eine|als|auch|es|an|werden|aus|er|hat|dass|sie|nach|wird|bei|einer|um|am|sind|noch|wie|einem|über|einen|das|so|zum|war|haben|nur|oder|aber|vor|zur|bis|mehr|durch|man|sein|wenn|sehr|ihr|seine|mark|gegen|vom|ganz|können|schon|wenn|habe|seine|euro|ihre|dann|unter|wir|soll|ich|eines|kann|gut)\b', text_lower), 'Portuguese': re.findall(r'\b(?:o|de|a|e|do|que|em|ser|da|para|com|um|por|os|no|se|na|uma|dos|mais|ao|como|mas|foi|das|tem|seu|sua|ou|quando|muito|já|eu|também|pelo|pela|até|isso|ela|entre|depois|sem|mesmo|aos|seus|quem|nas|esse|eles|essa|num|nem|suas|meu|às|minha|numa|pelos|elas|havia|seja|qual|será|nós|tenho|lhe|deles|essas|esses|pelas|este|dele|tu|te|você|vocês|lhes|meus|minhas)\b', text_lower), } # Count matches for each language scores = {} for lang, matches in patterns.items(): scores[lang] = len(matches) # If no patterns match, check for non-ASCII characters if max(scores.values()) == 0: # Check for specific character sets if re.search(r'[\u4e00-\u9fff]', text): # Chinese characters return 'Chinese' elif re.search(r'[\u3040-\u309f\u30a0-\u30ff]', text): # Japanese characters return 'Japanese' elif re.search(r'[\uac00-\ud7af]', text): # Korean characters return 'Korean' elif re.search(r'[\u0600-\u06ff]', text): # Arabic characters return 'Arabic' elif re.search(r'[\u0400-\u04ff]', text): # Cyrillic (Russian) return 'Russian' elif re.search(r'[\u0900-\u097f]', text): # Hindi characters return 'Hindi' else: return 'Unknown' # Return the language with highest score detected_lang = max(scores, key=scores.get) if scores[detected_lang] < 3: # If very few matches, return Unknown return 'Unknown' return detected_lang def detect_language_switching(text): """Simple detection of multiple languages in text""" sentences = text.split('.') languages = [] for sentence in sentences: if len(sentence.strip()) > 10: lang = simple_language_detection(sentence) if lang != 'Unknown': languages.append(lang) unique_languages = list(set(languages)) if len(unique_languages) > 1: return True, unique_languages return False, unique_languages def check_credential_phishing(message): """Check if email is asking for credentials or personal info""" message_lower = message.lower() found_credential_keywords = [] for keyword in CREDENTIAL_KEYWORDS: if keyword in message_lower: found_credential_keywords.append(keyword) # Check for common phishing patterns phishing_patterns = [] if re.search(r'(click|tap|press).*(link|here|button)', message_lower): phishing_patterns.append("Suspicious call-to-action") if re.search(r'(within|in).*(24|48|72).*(hour|hr)', message_lower): phishing_patterns.append("Time pressure tactics") if re.search(r'(suspend|lock|close|terminate).*(account|access)', message_lower): phishing_patterns.append("Account threat") if re.search(r'(confirm|verify|update).*(information|details|data)', message_lower): phishing_patterns.append("Information request") return found_credential_keywords, phishing_patterns def extract_urls(message): """Extract all URLs from the message""" url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' urls = re.findall(url_pattern, message) return urls def analyze_email(message): """Detailed email analysis""" analysis = {} # Basic stats analysis['word_count'] = len(message.split()) analysis['char_count'] = len(message) # Language detection using simple method analysis['language'] = simple_language_detection(message) analysis['language_switching'], analysis['detected_languages'] = detect_language_switching(message) # Extract URLs analysis['urls'] = extract_urls(message) analysis['has_urls'] = len(analysis['urls']) > 0 analysis['has_email'] = bool(re.search(r'\S+@\S+', message)) # Suspicious patterns analysis['all_caps_words'] = len([w for w in message.split() if w.isupper() and len(w) > 2]) analysis['exclamation_marks'] = message.count('!') # Spam keywords found message_lower = message.lower() found_keywords = [kw for kw in SPAM_KEYWORDS if kw in message_lower] analysis['spam_keywords'] = found_keywords # Credential phishing check analysis['credential_keywords'], analysis['phishing_patterns'] = check_credential_phishing(message) return analysis def highlight_spam_words(message, keywords): """Highlight spam keywords in the message""" highlighted = message for kw in keywords: pattern = re.compile(re.escape(kw), re.IGNORECASE) highlighted = pattern.sub(f'{kw}', highlighted) return highlighted def generate_security_tips(analysis, is_spam): """Generate personalized security tips based on analysis""" tips = [] if is_spam: tips.append("⚠️ This email has been flagged as spam. Exercise caution.") if analysis['credential_keywords']: tips.append("🔐 Never share passwords or credentials via email.") tips.append("🛡️ Legitimate companies won't ask for sensitive info via email.") if analysis['has_urls']: tips.append("🔗 Hover over links before clicking to verify destination.") tips.append("🌐 Check if URL matches the official company website.") if analysis['phishing_patterns']: tips.append("⏰ Be suspicious of emails creating artificial urgency.") tips.append("📞 Contact the company directly using official contact info.") if analysis['language_switching']: tips.append("🌍 Multiple languages detected - common tactic in international scams.") if analysis['all_caps_words'] > 3: tips.append("📢 Excessive capitalization is often used to create panic.") if not tips: tips.append("✅ Stay vigilant with all emails requesting action or information.") return tips def classify_email(message): if not message.strip(): return "
Empty message
", "", "", "", "", "" try: # Get analysis analysis = analyze_email(message) # Classify cleaned = preprocessor.preprocess(message) vec = vectorizer.transform([cleaned]) pred = model.predict(vec)[0] is_spam = pred == 1 result_type = "Spam" if is_spam else "Not Spam" # Result card if is_spam: result_html = """
🔴 Spam Detected
""" else: result_html = """
🟢 Legitimate Email
""" # Language info lang_warning = "" if analysis['language_switching']: langs = ', '.join(analysis['detected_languages']) lang_warning = f"⚠️ Language Switching:Yes ({langs})" # Analysis details details_html = f"""

📊 Email Analysis

{lang_warning}
Detected Language:{analysis['language']}
Word Count:{analysis['word_count']}
Character Count:{analysis['char_count']}
Contains URLs:{'⚠️ Yes (' + str(len(analysis['urls'])) + ')' if analysis['has_urls'] else '✓ No'}
Contains Emails:{'Yes' if analysis['has_email'] else 'No'}
ALL CAPS Words:{analysis['all_caps_words']}
Exclamation Marks:{analysis['exclamation_marks']}
""" # Credential phishing warning if analysis['credential_keywords'] or analysis['phishing_patterns']: credential_html = f"""

🔐 Credential Phishing Alert!

{f"

Suspicious Keywords: {', '.join(analysis['credential_keywords'])}

" if analysis['credential_keywords'] else ""} {f"

Phishing Patterns: {', '.join(analysis['phishing_patterns'])}

" if analysis['phishing_patterns'] else ""}

⚠️ Warning: This email appears to be attempting to steal your credentials or personal information.

""" else: credential_html = "" # URLs detected if analysis['urls']: urls_html = f"""

🔗 URLs Detected

{'
'.join(['' + url + '' for url in analysis['urls']])}

💡 Tip: Always verify URLs before clicking. Hover to see the actual destination.

""" else: urls_html = "" # Highlighted message with spam keywords if analysis['spam_keywords']: keywords_html = f"""

⚠️ Suspicious Keywords Found

Keywords: {', '.join(analysis['spam_keywords'])}

{highlight_spam_words(message, analysis['spam_keywords'])}
""" else: keywords_html = "" # Security tips tips = generate_security_tips(analysis, is_spam) tips_html = f"""

🛡️ Security Tips

""" return result_html, details_html, credential_html, urls_html, keywords_html, tips_html except Exception as e: print(f"Prediction error: {e}") return "
Error during classification
", "", "", "", "", "" def process_bulk_emails(file): """Process bulk emails from file""" if file is None: return "Please upload a file", None try: # Read file if file.name.endswith('.csv'): df = pd.read_csv(file.name) elif file.name.endswith('.txt'): with open(file.name, 'r', encoding='utf-8') as f: emails = f.readlines() df = pd.DataFrame({'email': emails}) else: return "Unsupported file format. Use CSV or TXT", None # Get email column email_col = df.columns[0] results = [] for idx, email in enumerate(df[email_col]): if pd.isna(email) or not str(email).strip(): continue cleaned = preprocessor.preprocess(str(email)) vec = vectorizer.transform([cleaned]) pred = model.predict(vec)[0] # Additional analysis analysis = analyze_email(str(email)) results.append({ 'Email': str(email)[:100] + '...' if len(str(email)) > 100 else str(email), 'Classification': 'Spam' if pred == 1 else 'Not Spam', 'Language': analysis['language'], 'Has_URLs': 'Yes' if analysis['has_urls'] else 'No', 'Credential_Risk': 'High' if analysis['credential_keywords'] else 'Low' }) results_df = pd.DataFrame(results) # Save to CSV output_path = "spam_classification_results.csv" results_df.to_csv(output_path, index=False) spam_count = len([r for r in results if r['Classification'] == 'Spam']) credential_risks = len([r for r in results if r['Credential_Risk'] == 'High']) summary = f"✅ Processed {len(results)} emails\n" summary += f"🔴 Spam: {spam_count}\n" summary += f"🟢 Not Spam: {len(results) - spam_count}\n" summary += f"🔐 Credential Phishing Risk: {credential_risks}" return summary, output_path except Exception as e: return f"Error processing file: {str(e)}", None # Enhanced examples with more diverse scenarios examples = [ ["Congratulations! You've won a $1000 gift card. Click here to claim your prize now!"], ["Thank you for registering for the conference. Your ticket and schedule are attached below. Looking forward to seeing you there."], ["Hello team, the project report is attached. Please review before tomorrow's meeting."], ["Hey John, are we still on for lunch tomorrow? Let me know!"], ["Make your business unforgettable with a new corporate identity. Order your custom logo design today — unlimited changes, fast delivery, and 100% satisfaction guaranteed."], ] # Custom CSS css = """ body {background-color: #f0f2f5; font-family: 'Segoe UI', sans-serif;} h1 {color:#4B0082; text-align:center; margin-bottom:20px;} .gr-button-primary {background-color:#4B0082; color:white; font-weight:bold;} .gr-label {font-weight:bold;} .gr-textbox textarea {font-size:14px;} mark {animation: highlight 0.5s ease;} @keyframes highlight {from {background-color: transparent;} to {background-color: #ffcccc;}} """ # Gradio interface with gr.Blocks(css=css, theme=gr.themes.Soft(), title=" Email Spam Classifier") as demo: gr.Markdown("# 📧 Email Spam Classifier") with gr.Tabs(): # Single Email Tab with gr.Tab("🔍 Single Email Check"): with gr.Row(): with gr.Column(scale=2): input_text = gr.Textbox( lines=8, placeholder="Paste your email here...", label="📝 Email Message" ) with gr.Row(): submit_btn = gr.Button("🔍 Check Email", variant="primary") clear_btn = gr.ClearButton([input_text], value="🗑️ Clear") with gr.Column(scale=1): output_label = gr.HTML(label="📊 Result") analysis_output = gr.HTML(label="📋 Analysis Details") credential_output = gr.HTML(label="🔐 Credential Phishing Check") urls_output = gr.HTML(label="🔗 URLs Found") keywords_output = gr.HTML(label="🔎 Keyword Highlights") tips_output = gr.HTML(label="🛡️ Security Tips") gr.Examples( examples=examples, inputs=input_text, outputs=[output_label, analysis_output, credential_output, urls_output, keywords_output, tips_output], fn=classify_email ) submit_btn.click( fn=classify_email, inputs=input_text, outputs=[output_label, analysis_output, credential_output, urls_output, keywords_output, tips_output] ) input_text.submit( fn=classify_email, inputs=input_text, outputs=[output_label, analysis_output, credential_output, urls_output, keywords_output, tips_output] ) # Bulk Processing Tab with gr.Tab("📦 Bulk Processing"): gr.Markdown("### Upload a CSV or TXT file with emails (one per line)") gr.Markdown("*Results will include spam classification, language detection, and credential phishing risk*") with gr.Row(): with gr.Column(): file_input = gr.File(label="📁 Upload File", file_types=[".csv", ".txt"]) bulk_btn = gr.Button("🚀 Process Bulk Emails", variant="primary") with gr.Column(): bulk_output = gr.Textbox(label="📊 Processing Summary", lines=6) download_output = gr.File(label="⬇️ Download Results") bulk_btn.click( fn=process_bulk_emails, inputs=file_input, outputs=[bulk_output, download_output] ) if __name__ == "__main__": demo.launch()