Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import joblib | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| import re | |
| import os | |
| import pandas as pd | |
| from collections import Counter | |
| import io | |
| # Download NLTK data | |
| try: | |
| nltk.download('stopwords', quiet=True) | |
| nltk.download('wordnet', quiet=True) | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('omw-1.4', quiet=True) | |
| except: | |
| pass | |
| # Preprocessor with multi-language support | |
| class AdvancedTextPreprocessor: | |
| def __init__(self, use_lemmatization=True, languages=['english']): | |
| self.use_lemmatization = use_lemmatization | |
| self.stop_words = set() | |
| for lang in languages: | |
| try: | |
| self.stop_words.update(set(stopwords.words(lang))) | |
| except: | |
| pass | |
| self.lemmatizer = WordNetLemmatizer() | |
| def clean_text(self, text): | |
| text = str(text).lower() | |
| text = re.sub(r'http\S+|www\S+|https\S+', '', text) | |
| text = re.sub(r'\S+@\S+', '', text) | |
| text = re.sub(r'\d+', '', text) | |
| text = re.sub(r'[^a-zA-Z\s]', '', text) | |
| return ' '.join(text.split()) | |
| def remove_stopwords(self, text): | |
| words = text.split() | |
| filtered = [w for w in words if w not in self.stop_words] | |
| return ' '.join(filtered) | |
| def lemmatize_text(self, text): | |
| try: | |
| return ' '.join([self.lemmatizer.lemmatize(w) for w in text.split()]) | |
| except: | |
| return text | |
| def preprocess(self, text): | |
| text = self.clean_text(text) | |
| text = self.remove_stopwords(text) | |
| if self.use_lemmatization: | |
| text = self.lemmatize_text(text) | |
| return text | |
| preprocessor = AdvancedTextPreprocessor(languages=['english']) | |
| # Load model and vectorizer | |
| model_path = "spam_classifier.joblib" | |
| vectorizer_path = "tfidf_vectorizer.joblib" | |
| model = joblib.load(model_path) | |
| vectorizer = joblib.load(vectorizer_path) | |
| # Spam indicators | |
| SPAM_KEYWORDS = ['win', 'winner', 'congratulations', 'free', 'urgent', 'click', 'verify', | |
| 'account', 'suspended', 'prize', 'lottery', 'cash', 'credit', 'loan', | |
| 'limited time', 'act now', 'expire', 'claim', 'bonus'] | |
| # Credential phishing keywords | |
| CREDENTIAL_KEYWORDS = ['password', 'username', 'login', 'credential', 'signin', 'sign in', | |
| 'verify account', 'confirm identity', 'update payment', 'billing information', | |
| 'security alert', 'unusual activity', 'locked account', 'reset password'] | |
| def simple_language_detection(text): | |
| """Simple language detection based on character patterns""" | |
| # Count character types | |
| text_lower = text.lower() | |
| # Common patterns for different languages | |
| patterns = { | |
| 'English': re.findall(r'\b(?:the|and|for|are|but|not|you|all|have|her|was|one|our|out|if|will|can|what|when|your|said|there|each|which|their|time|with|about|many|then|them|these|some|would|make|like|him|into|has|look|more|write|see|other|after|than|call|first|may|way|who|its|now|people|been|had|how|did|get|made|find|where|much|too|very|still|being|going)\b', text_lower), | |
| 'Spanish': re.findall(r'\b(?:el|la|de|que|y|en|un|por|con|para|es|los|se|las|del|al|más|pero|su|le|ya|este|todo|esta|son|cuando|muy|sin|sobre|también|hay|donde|quien|desde|todos|parte|tiene|esto|ese|cada|hasta|vida|otros|aunque|esa|eso|hace|otra|gobierno|tan|durante|siempre|día|tanto|ella|tres|sí|dijo|sido|gran|país|según|menos|mundo|año|antes|estado|está|hombre|estar|caso|nada|hacer|años|tiempo|hoy|mayor|ahora|momento|mucho|después|entre|gente|sistema|ser|ciudad|manera|forma|dar|donde)\b', text_lower), | |
| 'French': re.findall(r'\b(?:le|de|un|être|et|à|il|avoir|ne|je|son|que|se|qui|ce|dans|elle|au|pour|pas|sur|on|avec|tout|plus|leur|était|par|sans|tu|ou|bien|dit|elle|si|comme|mais|peut|nous|aussi|autre|dont|où|encore|maintenant|deux|même|déjà|avant|ici|peu|alors|sous|homme|notre|très|même|quand|notre|sans|pourquoi|tout|après|jamais|aussi|toujours|puis|jamais|rien|cela|jour)\b', text_lower), | |
| 'German': re.findall(r'\b(?:der|die|und|in|den|von|zu|das|mit|sich|des|auf|für|ist|im|dem|nicht|ein|eine|als|auch|es|an|werden|aus|er|hat|dass|sie|nach|wird|bei|einer|um|am|sind|noch|wie|einem|über|einen|das|so|zum|war|haben|nur|oder|aber|vor|zur|bis|mehr|durch|man|sein|wenn|sehr|ihr|seine|mark|gegen|vom|ganz|können|schon|wenn|habe|seine|euro|ihre|dann|unter|wir|soll|ich|eines|kann|gut)\b', text_lower), | |
| 'Portuguese': re.findall(r'\b(?:o|de|a|e|do|que|em|ser|da|para|com|um|por|os|no|se|na|uma|dos|mais|ao|como|mas|foi|das|tem|seu|sua|ou|quando|muito|já|eu|também|pelo|pela|até|isso|ela|entre|depois|sem|mesmo|aos|seus|quem|nas|esse|eles|essa|num|nem|suas|meu|às|minha|numa|pelos|elas|havia|seja|qual|será|nós|tenho|lhe|deles|essas|esses|pelas|este|dele|tu|te|você|vocês|lhes|meus|minhas)\b', text_lower), | |
| } | |
| # Count matches for each language | |
| scores = {} | |
| for lang, matches in patterns.items(): | |
| scores[lang] = len(matches) | |
| # If no patterns match, check for non-ASCII characters | |
| if max(scores.values()) == 0: | |
| # Check for specific character sets | |
| if re.search(r'[\u4e00-\u9fff]', text): # Chinese characters | |
| return 'Chinese' | |
| elif re.search(r'[\u3040-\u309f\u30a0-\u30ff]', text): # Japanese characters | |
| return 'Japanese' | |
| elif re.search(r'[\uac00-\ud7af]', text): # Korean characters | |
| return 'Korean' | |
| elif re.search(r'[\u0600-\u06ff]', text): # Arabic characters | |
| return 'Arabic' | |
| elif re.search(r'[\u0400-\u04ff]', text): # Cyrillic (Russian) | |
| return 'Russian' | |
| elif re.search(r'[\u0900-\u097f]', text): # Hindi characters | |
| return 'Hindi' | |
| else: | |
| return 'Unknown' | |
| # Return the language with highest score | |
| detected_lang = max(scores, key=scores.get) | |
| if scores[detected_lang] < 3: # If very few matches, return Unknown | |
| return 'Unknown' | |
| return detected_lang | |
| def detect_language_switching(text): | |
| """Simple detection of multiple languages in text""" | |
| sentences = text.split('.') | |
| languages = [] | |
| for sentence in sentences: | |
| if len(sentence.strip()) > 10: | |
| lang = simple_language_detection(sentence) | |
| if lang != 'Unknown': | |
| languages.append(lang) | |
| unique_languages = list(set(languages)) | |
| if len(unique_languages) > 1: | |
| return True, unique_languages | |
| return False, unique_languages | |
| def check_credential_phishing(message): | |
| """Check if email is asking for credentials or personal info""" | |
| message_lower = message.lower() | |
| found_credential_keywords = [] | |
| for keyword in CREDENTIAL_KEYWORDS: | |
| if keyword in message_lower: | |
| found_credential_keywords.append(keyword) | |
| # Check for common phishing patterns | |
| phishing_patterns = [] | |
| if re.search(r'(click|tap|press).*(link|here|button)', message_lower): | |
| phishing_patterns.append("Suspicious call-to-action") | |
| if re.search(r'(within|in).*(24|48|72).*(hour|hr)', message_lower): | |
| phishing_patterns.append("Time pressure tactics") | |
| if re.search(r'(suspend|lock|close|terminate).*(account|access)', message_lower): | |
| phishing_patterns.append("Account threat") | |
| if re.search(r'(confirm|verify|update).*(information|details|data)', message_lower): | |
| phishing_patterns.append("Information request") | |
| return found_credential_keywords, phishing_patterns | |
| def extract_urls(message): | |
| """Extract all URLs from the message""" | |
| url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' | |
| urls = re.findall(url_pattern, message) | |
| return urls | |
| def analyze_email(message): | |
| """Detailed email analysis""" | |
| analysis = {} | |
| # Basic stats | |
| analysis['word_count'] = len(message.split()) | |
| analysis['char_count'] = len(message) | |
| # Language detection using simple method | |
| analysis['language'] = simple_language_detection(message) | |
| analysis['language_switching'], analysis['detected_languages'] = detect_language_switching(message) | |
| # Extract URLs | |
| analysis['urls'] = extract_urls(message) | |
| analysis['has_urls'] = len(analysis['urls']) > 0 | |
| analysis['has_email'] = bool(re.search(r'\S+@\S+', message)) | |
| # Suspicious patterns | |
| analysis['all_caps_words'] = len([w for w in message.split() if w.isupper() and len(w) > 2]) | |
| analysis['exclamation_marks'] = message.count('!') | |
| # Spam keywords found | |
| message_lower = message.lower() | |
| found_keywords = [kw for kw in SPAM_KEYWORDS if kw in message_lower] | |
| analysis['spam_keywords'] = found_keywords | |
| # Credential phishing check | |
| analysis['credential_keywords'], analysis['phishing_patterns'] = check_credential_phishing(message) | |
| return analysis | |
| def highlight_spam_words(message, keywords): | |
| """Highlight spam keywords in the message""" | |
| highlighted = message | |
| for kw in keywords: | |
| pattern = re.compile(re.escape(kw), re.IGNORECASE) | |
| highlighted = pattern.sub(f'<mark style="background-color: #ffcccc; padding: 2px 4px; border-radius: 3px;">{kw}</mark>', highlighted) | |
| return highlighted | |
| def generate_security_tips(analysis, is_spam): | |
| """Generate personalized security tips based on analysis""" | |
| tips = [] | |
| if is_spam: | |
| tips.append("⚠️ This email has been flagged as spam. Exercise caution.") | |
| if analysis['credential_keywords']: | |
| tips.append("🔐 Never share passwords or credentials via email.") | |
| tips.append("🛡️ Legitimate companies won't ask for sensitive info via email.") | |
| if analysis['has_urls']: | |
| tips.append("🔗 Hover over links before clicking to verify destination.") | |
| tips.append("🌐 Check if URL matches the official company website.") | |
| if analysis['phishing_patterns']: | |
| tips.append("⏰ Be suspicious of emails creating artificial urgency.") | |
| tips.append("📞 Contact the company directly using official contact info.") | |
| if analysis['language_switching']: | |
| tips.append("🌍 Multiple languages detected - common tactic in international scams.") | |
| if analysis['all_caps_words'] > 3: | |
| tips.append("📢 Excessive capitalization is often used to create panic.") | |
| if not tips: | |
| tips.append("✅ Stay vigilant with all emails requesting action or information.") | |
| return tips | |
| def classify_email(message): | |
| if not message.strip(): | |
| return "<div style='color:gray;'>Empty message</div>", "", "", "", "", "" | |
| try: | |
| # Get analysis | |
| analysis = analyze_email(message) | |
| # Classify | |
| cleaned = preprocessor.preprocess(message) | |
| vec = vectorizer.transform([cleaned]) | |
| pred = model.predict(vec)[0] | |
| is_spam = pred == 1 | |
| result_type = "Spam" if is_spam else "Not Spam" | |
| # Result card | |
| if is_spam: | |
| result_html = """ | |
| <div style='border:2px solid #ff4d4d; border-radius:10px; background-color:#ffe6e6; | |
| padding:15px; font-size:18px; font-weight:bold; text-align:center;'> | |
| 🔴 Spam Detected | |
| </div> | |
| """ | |
| else: | |
| result_html = """ | |
| <div style='border:2px solid #4dff4d; border-radius:10px; background-color:#e6ffe6; | |
| padding:15px; font-size:18px; font-weight:bold; text-align:center;'> | |
| 🟢 Legitimate Email | |
| </div> | |
| """ | |
| # Language info | |
| lang_warning = "" | |
| if analysis['language_switching']: | |
| langs = ', '.join(analysis['detected_languages']) | |
| lang_warning = f"<tr style='background-color:#fff3cd;'><td style='padding:5px;'><b>⚠️ Language Switching:</b></td><td>Yes ({langs})</td></tr>" | |
| # Analysis details | |
| details_html = f""" | |
| <div style='background-color:#f8f9fa; padding:15px; border-radius:8px; margin-top:10px;'> | |
| <h3 style='margin-top:0; color:#333;'>📊 Email Analysis</h3> | |
| <table style='width:100%; border-collapse: collapse;'> | |
| <tr><td style='padding:5px;'><b>Detected Language:</b></td><td>{analysis['language']}</td></tr> | |
| {lang_warning} | |
| <tr><td style='padding:5px;'><b>Word Count:</b></td><td>{analysis['word_count']}</td></tr> | |
| <tr><td style='padding:5px;'><b>Character Count:</b></td><td>{analysis['char_count']}</td></tr> | |
| <tr><td style='padding:5px;'><b>Contains URLs:</b></td><td>{'⚠️ Yes (' + str(len(analysis['urls'])) + ')' if analysis['has_urls'] else '✓ No'}</td></tr> | |
| <tr><td style='padding:5px;'><b>Contains Emails:</b></td><td>{'Yes' if analysis['has_email'] else 'No'}</td></tr> | |
| <tr><td style='padding:5px;'><b>ALL CAPS Words:</b></td><td>{analysis['all_caps_words']}</td></tr> | |
| <tr><td style='padding:5px;'><b>Exclamation Marks:</b></td><td>{analysis['exclamation_marks']}</td></tr> | |
| </table> | |
| </div> | |
| """ | |
| # Credential phishing warning | |
| if analysis['credential_keywords'] or analysis['phishing_patterns']: | |
| credential_html = f""" | |
| <div style='background-color:#ffebee; padding:15px; border-radius:8px; margin-top:10px; border-left:4px solid #d32f2f;'> | |
| <h3 style='margin-top:0; color:#d32f2f;'>🔐 Credential Phishing Alert!</h3> | |
| {f"<p style='margin:5px 0;'><b>Suspicious Keywords:</b> {', '.join(analysis['credential_keywords'])}</p>" if analysis['credential_keywords'] else ""} | |
| {f"<p style='margin:5px 0;'><b>Phishing Patterns:</b> {', '.join(analysis['phishing_patterns'])}</p>" if analysis['phishing_patterns'] else ""} | |
| <p style='margin:10px 0 0 0; padding:10px; background-color:#fff; border-radius:5px;'> | |
| <b>⚠️ Warning:</b> This email appears to be attempting to steal your credentials or personal information. | |
| </p> | |
| </div> | |
| """ | |
| else: | |
| credential_html = "" | |
| # URLs detected | |
| if analysis['urls']: | |
| urls_html = f""" | |
| <div style='background-color:#fff3cd; padding:15px; border-radius:8px; margin-top:10px; border-left:4px solid #ff9800;'> | |
| <h3 style='margin-top:0; color:#333;'>🔗 URLs Detected</h3> | |
| <div style='background-color:white; padding:10px; border-radius:5px; font-size:14px;'> | |
| {'<br>'.join(['<a href="' + url + '" target="_blank" style="color:#d32f2f; word-break:break-all;">' + url + '</a>' for url in analysis['urls']])} | |
| </div> | |
| <p style='margin:10px 0 0 0; font-size:13px; color:#666;'> | |
| 💡 Tip: Always verify URLs before clicking. Hover to see the actual destination. | |
| </p> | |
| </div> | |
| """ | |
| else: | |
| urls_html = "" | |
| # Highlighted message with spam keywords | |
| if analysis['spam_keywords']: | |
| keywords_html = f""" | |
| <div style='background-color:#ffebee; padding:15px; border-radius:8px; margin-top:10px; border-left:4px solid #f44336;'> | |
| <h3 style='margin-top:0; color:#333;'>⚠️ Suspicious Keywords Found</h3> | |
| <p style='margin:5px 0;'><b>Keywords:</b> {', '.join(analysis['spam_keywords'])}</p> | |
| <div style='background-color:white; padding:10px; border-radius:5px; margin-top:10px; font-size:14px; line-height:1.6;'> | |
| {highlight_spam_words(message, analysis['spam_keywords'])} | |
| </div> | |
| </div> | |
| """ | |
| else: | |
| keywords_html = "" | |
| # Security tips | |
| tips = generate_security_tips(analysis, is_spam) | |
| tips_html = f""" | |
| <div style='background-color:#e8f5e9; padding:15px; border-radius:8px; margin-top:10px; border-left:4px solid #4caf50;'> | |
| <h3 style='margin-top:0; color:#2e7d32;'>🛡️ Security Tips</h3> | |
| <ul style='margin:5px 0; padding-left:20px;'> | |
| {''.join(['<li style="margin:5px 0;">' + tip + '</li>' for tip in tips])} | |
| </ul> | |
| </div> | |
| """ | |
| return result_html, details_html, credential_html, urls_html, keywords_html, tips_html | |
| except Exception as e: | |
| print(f"Prediction error: {e}") | |
| return "<div style='color:gray;'>Error during classification</div>", "", "", "", "", "" | |
| def process_bulk_emails(file): | |
| """Process bulk emails from file""" | |
| if file is None: | |
| return "Please upload a file", None | |
| try: | |
| # Read file | |
| if file.name.endswith('.csv'): | |
| df = pd.read_csv(file.name) | |
| elif file.name.endswith('.txt'): | |
| with open(file.name, 'r', encoding='utf-8') as f: | |
| emails = f.readlines() | |
| df = pd.DataFrame({'email': emails}) | |
| else: | |
| return "Unsupported file format. Use CSV or TXT", None | |
| # Get email column | |
| email_col = df.columns[0] | |
| results = [] | |
| for idx, email in enumerate(df[email_col]): | |
| if pd.isna(email) or not str(email).strip(): | |
| continue | |
| cleaned = preprocessor.preprocess(str(email)) | |
| vec = vectorizer.transform([cleaned]) | |
| pred = model.predict(vec)[0] | |
| # Additional analysis | |
| analysis = analyze_email(str(email)) | |
| results.append({ | |
| 'Email': str(email)[:100] + '...' if len(str(email)) > 100 else str(email), | |
| 'Classification': 'Spam' if pred == 1 else 'Not Spam', | |
| 'Language': analysis['language'], | |
| 'Has_URLs': 'Yes' if analysis['has_urls'] else 'No', | |
| 'Credential_Risk': 'High' if analysis['credential_keywords'] else 'Low' | |
| }) | |
| results_df = pd.DataFrame(results) | |
| # Save to CSV | |
| output_path = "spam_classification_results.csv" | |
| results_df.to_csv(output_path, index=False) | |
| spam_count = len([r for r in results if r['Classification'] == 'Spam']) | |
| credential_risks = len([r for r in results if r['Credential_Risk'] == 'High']) | |
| summary = f"✅ Processed {len(results)} emails\n" | |
| summary += f"🔴 Spam: {spam_count}\n" | |
| summary += f"🟢 Not Spam: {len(results) - spam_count}\n" | |
| summary += f"🔐 Credential Phishing Risk: {credential_risks}" | |
| return summary, output_path | |
| except Exception as e: | |
| return f"Error processing file: {str(e)}", None | |
| # Enhanced examples with more diverse scenarios | |
| examples = [ | |
| ["Congratulations! You've won a $1000 gift card. Click here to claim your prize now!"], | |
| ["Thank you for registering for the conference. Your ticket and schedule are attached below. Looking forward to seeing you there."], | |
| ["Hello team, the project report is attached. Please review before tomorrow's meeting."], | |
| ["Hey John, are we still on for lunch tomorrow? Let me know!"], | |
| ["Make your business unforgettable with a new corporate identity. Order your custom logo design today — unlimited changes, fast delivery, and 100% satisfaction guaranteed."], | |
| ] | |
| # Custom CSS | |
| css = """ | |
| body {background-color: #f0f2f5; font-family: 'Segoe UI', sans-serif;} | |
| h1 {color:#4B0082; text-align:center; margin-bottom:20px;} | |
| .gr-button-primary {background-color:#4B0082; color:white; font-weight:bold;} | |
| .gr-label {font-weight:bold;} | |
| .gr-textbox textarea {font-size:14px;} | |
| mark {animation: highlight 0.5s ease;} | |
| @keyframes highlight {from {background-color: transparent;} to {background-color: #ffcccc;}} | |
| """ | |
| # Gradio interface | |
| with gr.Blocks(css=css, theme=gr.themes.Soft(), title=" Email Spam Classifier") as demo: | |
| gr.Markdown("# 📧 Email Spam Classifier") | |
| with gr.Tabs(): | |
| # Single Email Tab | |
| with gr.Tab("🔍 Single Email Check"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| input_text = gr.Textbox( | |
| lines=8, | |
| placeholder="Paste your email here...", | |
| label="📝 Email Message" | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("🔍 Check Email", variant="primary") | |
| clear_btn = gr.ClearButton([input_text], value="🗑️ Clear") | |
| with gr.Column(scale=1): | |
| output_label = gr.HTML(label="📊 Result") | |
| analysis_output = gr.HTML(label="📋 Analysis Details") | |
| credential_output = gr.HTML(label="🔐 Credential Phishing Check") | |
| urls_output = gr.HTML(label="🔗 URLs Found") | |
| keywords_output = gr.HTML(label="🔎 Keyword Highlights") | |
| tips_output = gr.HTML(label="🛡️ Security Tips") | |
| gr.Examples( | |
| examples=examples, | |
| inputs=input_text, | |
| outputs=[output_label, analysis_output, credential_output, urls_output, keywords_output, tips_output], | |
| fn=classify_email | |
| ) | |
| submit_btn.click( | |
| fn=classify_email, | |
| inputs=input_text, | |
| outputs=[output_label, analysis_output, credential_output, urls_output, keywords_output, tips_output] | |
| ) | |
| input_text.submit( | |
| fn=classify_email, | |
| inputs=input_text, | |
| outputs=[output_label, analysis_output, credential_output, urls_output, keywords_output, tips_output] | |
| ) | |
| # Bulk Processing Tab | |
| with gr.Tab("📦 Bulk Processing"): | |
| gr.Markdown("### Upload a CSV or TXT file with emails (one per line)") | |
| gr.Markdown("*Results will include spam classification, language detection, and credential phishing risk*") | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File(label="📁 Upload File", file_types=[".csv", ".txt"]) | |
| bulk_btn = gr.Button("🚀 Process Bulk Emails", variant="primary") | |
| with gr.Column(): | |
| bulk_output = gr.Textbox(label="📊 Processing Summary", lines=6) | |
| download_output = gr.File(label="⬇️ Download Results") | |
| bulk_btn.click( | |
| fn=process_bulk_emails, | |
| inputs=file_input, | |
| outputs=[bulk_output, download_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |