import streamlit as st import pandas as pd import json import matplotlib.pyplot as plt from fpdf import FPDF import tempfile from transformers import AutoTokenizer, AutoModelForCausalLM import torch # --------------------------------------------------------------------- st.set_page_config(page_title="Smart Log Copilot", layout="wide") # --------------------------------------------------------------------- MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct" @st.cache_resource def load_llm(): tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map="auto", torch_dtype=torch.float16 if torch.cuda.is_available() else "auto" ) return tokenizer, model tokenizer, model = load_llm() def llm(prompt, max_new_tokens=150): inputs = tokenizer(prompt, return_tensors="pt").to(model.device) output = model.generate( **inputs, max_new_tokens=max_new_tokens, pad_token_id=tokenizer.eos_token_id, do_sample=False ) return tokenizer.decode(output[0], skip_special_tokens=True).replace(prompt, "").strip() INTENT_SYSTEM_PROMPT = """ Convert the user question into JSON for log analysis — no explanation, no text before or after. VALID actions: - "run_log_query" - "scan_anomalies" - "user_risk_report" - "global_risk_report" FORMAT: { "action": "", "parameters": { "users": "any" or ["username"], "time_range": "", "focus": "", "extra": "" } } Return JSON only. """ SUMMARY_SYSTEM_PROMPT = """ Write a SOC-style incident summary for a security manager. Include risk details + root cause + recommended actions. """ PLACEHOLDER_IMG = "https://dummyimage.com/600x300/ff0000/ffffff&text=Anomaly+Screenshot" # --------------------- SAFE INTENT EXTRACTION ----------------------- def extract_intent(question): raw = llm(INTENT_SYSTEM_PROMPT + "\nUSER: " + question + "\nReturn JSON now:") if "{" not in raw or "}" not in raw: return None raw = raw[raw.find("{"): raw.rfind("}") + 1] try: return json.loads(raw) except: return None # --------------------- ANOMALY ANALYTICS ---------------------------- def detect_anomalies(df): anomalies = [] df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") fails = df[df["status"].str.contains("fail", case=False, na=False)] if len(fails) >= 3: anomalies.append({"type": "login_failures", "details": f"{len(fails)} failed logins"}) off = df[(df["timestamp"].dt.hour >= 23) | (df["timestamp"].dt.hour < 6)] if len(off) > 0: anomalies.append({"type": "off_hours", "details": f"{len(off)} off-hours logins"}) sys_count = df.groupby(df["timestamp"].dt.date).system.nunique() if any(sys_count >= 5): anomalies.append({"type": "many_systems", "details": "5+ systems accessed in a day"}) if "country" in df.columns: loc = df.groupby(df["timestamp"].dt.date).country.nunique() if any(loc >= 2): anomalies.append({"type": "impossible_travel", "details": "multiple countries in one day"}) return anomalies def risk_score(anoms): if not anoms: return "🟢", "Low" if len(anoms) <= 2: return "🟡", "Medium" return "🔴", "High" # -------------------------- PDF ------------------------------------- def build_pdf(risk_icon, risk_label, summary, anomalies): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) pdf.multi_cell(0, 8, "Security Report – Smart Log Copilot") pdf.multi_cell(0, 8, f"Risk Level: {risk_icon} {risk_label}") pdf.ln(4) pdf.multi_cell(0, 6, summary) pdf.ln(4) pdf.multi_cell(0, 6, "Detected anomalies:") if anomalies: for a in anomalies: pdf.multi_cell(0, 6, f"- {a['type']}: {a['details']}") else: pdf.multi_cell(0, 6, "None") tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") pdf.output(tmp.name) return tmp.name # -------------------------- UI ------------------------------------- st.title("🔍 Smart Log Copilot (CSV-powered LLM)") uploaded = st.file_uploader("Upload CSV log file", type=["csv"]) df = pd.read_csv(uploaded) if uploaded else None if df is not None: df.columns = [c.lower().strip() for c in df.columns] # ensure case-insensitive st.success(f"CSV loaded ({len(df)} rows)") st.dataframe(df.head(20)) st.markdown("---") question = st.text_input("Ask a question about the logs:") if "history" not in st.session_state: st.session_state.history = [] col1, col2 = st.columns([3, 2]) with col1: if question and df is not None: with st.spinner("🧠 Analyzing logs…"): intent = extract_intent(question) params = intent.get("parameters", {}) if intent else {} users = params.get("users", "any") # <--- FIXED filtered = df if users == "any" else df[df["user"].str.lower().isin( [u.lower() for u in users] if isinstance(users, list) else [users.lower()] )] anomalies = detect_anomalies(filtered) icon, label = risk_score(anomalies) summary_prompt = ( SUMMARY_SYSTEM_PROMPT + f"\nQUESTION: {question}\nMATCHED ROWS: {len(filtered)}\nANOMALIES: {json.dumps(anomalies)}\n" ) summary = llm(summary_prompt) reply = f"{icon} **Risk Level: {label}**\n\n{summary}" st.session_state.history.append(("user", question)) st.session_state.history.append(("assistant", reply)) for role, text in st.session_state.history: st.chat_message(role).write(text) with col2: if df is not None and question: if anomalies: st.image(PLACEHOLDER_IMG, caption="Anomaly Screenshot") if "system" in df.columns: fig, ax = plt.subplots(figsize=(4, 2)) df["system"].value_counts().plot(kind="bar", ax=ax) st.pyplot(fig) if st.button("📄 Download PDF Report"): pdf_file = build_pdf(icon, label, summary, anomalies) with open(pdf_file, "rb") as f: st.download_button( "Download PDF", f, file_name="security_report.pdf", mime="application/pdf" )