SuriRaja's picture
Update app.py
4ae98ee verified
import streamlit as st
import pandas as pd
import json
import matplotlib.pyplot as plt
from fpdf import FPDF
import tempfile
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
# ---------------------------------------------------------------------
st.set_page_config(page_title="Smart Log Copilot", layout="wide")
# ---------------------------------------------------------------------
MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
@st.cache_resource
def load_llm():
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
device_map="auto",
torch_dtype=torch.float16 if torch.cuda.is_available() else "auto"
)
return tokenizer, model
tokenizer, model = load_llm()
def llm(prompt, max_new_tokens=150):
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
output = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
pad_token_id=tokenizer.eos_token_id,
do_sample=False
)
return tokenizer.decode(output[0], skip_special_tokens=True).replace(prompt, "").strip()
INTENT_SYSTEM_PROMPT = """
Convert the user question into JSON for log analysis β€” no explanation, no text before or after.
VALID actions:
- "run_log_query"
- "scan_anomalies"
- "user_risk_report"
- "global_risk_report"
FORMAT:
{
"action": "",
"parameters": {
"users": "any" or ["username"],
"time_range": "",
"focus": "",
"extra": ""
}
}
Return JSON only.
"""
SUMMARY_SYSTEM_PROMPT = """
Write a SOC-style incident summary for a security manager.
Include risk details + root cause + recommended actions.
"""
PLACEHOLDER_IMG = "https://dummyimage.com/600x300/ff0000/ffffff&text=Anomaly+Screenshot"
# --------------------- SAFE INTENT EXTRACTION -----------------------
def extract_intent(question):
raw = llm(INTENT_SYSTEM_PROMPT + "\nUSER: " + question + "\nReturn JSON now:")
if "{" not in raw or "}" not in raw:
return None
raw = raw[raw.find("{"): raw.rfind("}") + 1]
try:
return json.loads(raw)
except:
return None
# --------------------- ANOMALY ANALYTICS ----------------------------
def detect_anomalies(df):
anomalies = []
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
fails = df[df["status"].str.contains("fail", case=False, na=False)]
if len(fails) >= 3:
anomalies.append({"type": "login_failures", "details": f"{len(fails)} failed logins"})
off = df[(df["timestamp"].dt.hour >= 23) | (df["timestamp"].dt.hour < 6)]
if len(off) > 0:
anomalies.append({"type": "off_hours", "details": f"{len(off)} off-hours logins"})
sys_count = df.groupby(df["timestamp"].dt.date).system.nunique()
if any(sys_count >= 5):
anomalies.append({"type": "many_systems", "details": "5+ systems accessed in a day"})
if "country" in df.columns:
loc = df.groupby(df["timestamp"].dt.date).country.nunique()
if any(loc >= 2):
anomalies.append({"type": "impossible_travel", "details": "multiple countries in one day"})
return anomalies
def risk_score(anoms):
if not anoms: return "🟒", "Low"
if len(anoms) <= 2: return "🟑", "Medium"
return "πŸ”΄", "High"
# -------------------------- PDF -------------------------------------
def build_pdf(risk_icon, risk_label, summary, anomalies):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.multi_cell(0, 8, "Security Report – Smart Log Copilot")
pdf.multi_cell(0, 8, f"Risk Level: {risk_icon} {risk_label}")
pdf.ln(4)
pdf.multi_cell(0, 6, summary)
pdf.ln(4)
pdf.multi_cell(0, 6, "Detected anomalies:")
if anomalies:
for a in anomalies:
pdf.multi_cell(0, 6, f"- {a['type']}: {a['details']}")
else:
pdf.multi_cell(0, 6, "None")
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf")
pdf.output(tmp.name)
return tmp.name
# -------------------------- UI -------------------------------------
st.title("πŸ” Smart Log Copilot (CSV-powered LLM)")
uploaded = st.file_uploader("Upload CSV log file", type=["csv"])
df = pd.read_csv(uploaded) if uploaded else None
if df is not None:
df.columns = [c.lower().strip() for c in df.columns] # ensure case-insensitive
st.success(f"CSV loaded ({len(df)} rows)")
st.dataframe(df.head(20))
st.markdown("---")
question = st.text_input("Ask a question about the logs:")
if "history" not in st.session_state:
st.session_state.history = []
col1, col2 = st.columns([3, 2])
with col1:
if question and df is not None:
with st.spinner("🧠 Analyzing logs…"):
intent = extract_intent(question)
params = intent.get("parameters", {}) if intent else {}
users = params.get("users", "any") # <--- FIXED
filtered = df if users == "any" else df[df["user"].str.lower().isin(
[u.lower() for u in users] if isinstance(users, list) else [users.lower()]
)]
anomalies = detect_anomalies(filtered)
icon, label = risk_score(anomalies)
summary_prompt = (
SUMMARY_SYSTEM_PROMPT +
f"\nQUESTION: {question}\nMATCHED ROWS: {len(filtered)}\nANOMALIES: {json.dumps(anomalies)}\n"
)
summary = llm(summary_prompt)
reply = f"{icon} **Risk Level: {label}**\n\n{summary}"
st.session_state.history.append(("user", question))
st.session_state.history.append(("assistant", reply))
for role, text in st.session_state.history:
st.chat_message(role).write(text)
with col2:
if df is not None and question:
if anomalies:
st.image(PLACEHOLDER_IMG, caption="Anomaly Screenshot")
if "system" in df.columns:
fig, ax = plt.subplots(figsize=(4, 2))
df["system"].value_counts().plot(kind="bar", ax=ax)
st.pyplot(fig)
if st.button("πŸ“„ Download PDF Report"):
pdf_file = build_pdf(icon, label, summary, anomalies)
with open(pdf_file, "rb") as f:
st.download_button(
"Download PDF",
f,
file_name="security_report.pdf",
mime="application/pdf"
)