rhodawk-ai-devops-engine / cve_intel.py
Architect8999's picture
feat: Hermes Zero-Day Research Engine - 9 new modules + UI tab
5a2c9e0 verified
"""
Rhodawk AI — CVE Intelligence Layer
=====================================
Queries NVD/CVE databases and implements SSEC (Semantic Similarity Exploit Chain)
to find code patterns similar to historically exploited vulnerabilities.
Custom Algorithms:
SSEC — Semantic Similarity Exploit Chain
Embeds known exploit patterns and compares them to repo code using cosine
similarity. Finds "looks like CWE-X" candidates even without a test failure.
"""
from __future__ import annotations
import json
import math
import os
import re
import time
import glob
import hashlib
import requests
from dataclasses import dataclass, field
from typing import Optional
NVD_API_KEY = os.getenv("NVD_API_KEY", "")
NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"
CACHE_DIR = "/data/cve_cache"
os.makedirs(CACHE_DIR, exist_ok=True)
@dataclass
class CVERecord:
cve_id: str
description: str
severity: str
cvss_score: float
cwe_ids: list[str]
affected_products: list[str]
published: str
references: list[str]
# ──────────────────────────────────────────────────────────────
# KNOWN EXPLOIT PATTERNS — SSEC seed corpus
# Each entry: (pattern_name, CWE, regex_or_keywords, severity)
# ──────────────────────────────────────────────────────────────
_EXPLOIT_PATTERNS = [
# Memory corruption
("buffer_overflow_c", "CWE-119", r"strcpy|strcat|sprintf|gets\s*\(|scanf\s*\(", "CRITICAL"),
("integer_overflow", "CWE-190", r"(\w+)\s*\*\s*(\w+)\s*(?:>|<|==)\s*\d+|malloc\s*\(\s*\w+\s*\*", "HIGH"),
("use_after_free", "CWE-416", r"free\s*\(\s*(\w+)\s*\).*\1\s*->|\1\[", "CRITICAL"),
("format_string", "CWE-134", r'printf\s*\(\s*(?!")[^,)]+\)|fprintf\s*\(\s*\w+\s*,\s*(?!")[^,)]+\)', "HIGH"),
("null_deref", "CWE-476", r"(\w+)\s*=\s*malloc\(.*(?<!if\s*\()(?<!\w\s*==\s*NULL)\s*\1->|\1\[", "HIGH"),
# Injection
("sql_injection_py", "CWE-89", r'execute\s*\(\s*["\'].*%s|cursor\.execute\s*\(.*format\s*\(|\.query\s*\(.*\+', "CRITICAL"),
("sql_injection_js", "CWE-89", r'query\s*\(`[^`]*\$\{|\.query\s*\(\s*["\'][^"\']*"\s*\+', "CRITICAL"),
("cmd_injection_py", "CWE-78", r'os\.system\s*\(.*\+|subprocess\.call\s*\(.*shell\s*=\s*True', "CRITICAL"),
("path_traversal", "CWE-22", r'open\s*\(.*\+.*\.\.|os\.path\.join.*request|send_file.*request', "HIGH"),
("xss_reflected", "CWE-79", r'innerHTML\s*=.*req\.|document\.write.*req\.|render.*template.*request', "HIGH"),
("xxe", "CWE-611", r'ElementTree\.parse|lxml\.etree\.parse|minidom\.parseString.*(?!defusedxml)', "HIGH"),
("ssrf", "CWE-918", r'requests\.get\s*\(.*request\.|urllib.*urlopen.*request\.|httpx.*get.*request', "HIGH"),
# Crypto
("weak_hash", "CWE-328", r'hashlib\.md5\s*\(|hashlib\.sha1\s*\(|MD5\s*\(|SHA1\s*\(', "MEDIUM"),
("hardcoded_secret", "CWE-798", r'password\s*=\s*["\'][^"\']{4,}["\']|secret\s*=\s*["\'][^"\']{4,}["\']|api_key\s*=\s*["\'][^"\']{8,}["\']', "HIGH"),
("weak_random", "CWE-338", r'random\.random\s*\(|Math\.random\s*\(|rand\s*\(\)', "MEDIUM"),
("insecure_tls", "CWE-295", r'verify\s*=\s*False|ssl\._create_unverified_context|rejectUnauthorized\s*:\s*false', "HIGH"),
# Deserialization
("pickle_deserial", "CWE-502", r'pickle\.loads\s*\(.*request|pickle\.load\s*\(.*request', "CRITICAL"),
("yaml_deserial", "CWE-502", r'yaml\.load\s*\((?!.*Loader\s*=\s*yaml\.SafeLoader)', "HIGH"),
("json_deserial_uncheck", "CWE-502", r'eval\s*\(.*JSON|eval\s*\(.*json', "HIGH"),
# Auth/AuthZ
("missing_auth", "CWE-306", r'@app\.route.*(?!@login_required|@require_auth|@authenticated)', "MEDIUM"),
("jwt_none_alg", "CWE-347", r'algorithms\s*=\s*\[\s*["\']none["\']|decode.*options.*verify_signature.*False', "CRITICAL"),
("debug_mode_prod", "CWE-489", r'DEBUG\s*=\s*True|app\.run.*debug\s*=\s*True', "MEDIUM"),
# Race conditions
("toctou", "CWE-367", r'os\.path\.exists.*open\s*\(|access.*open\s*\(', "MEDIUM"),
("race_condition_thread", "CWE-362", r'threading\.Thread.*shared_var|global\s+\w+.*thread', "MEDIUM"),
]
def _scan_file_for_patterns(file_path: str, source: str) -> list[dict]:
"""Scan a single file against all SSEC exploit patterns."""
findings = []
lines = source.splitlines()
for pattern_name, cwe, regex, severity in _EXPLOIT_PATTERNS:
try:
for i, line in enumerate(lines, 1):
if re.search(regex, line, re.IGNORECASE):
if line.strip().startswith("#") or line.strip().startswith("//"):
continue
findings.append({
"pattern": pattern_name,
"cwe": cwe,
"severity": severity,
"file": file_path,
"line": i,
"snippet": line.strip()[:120],
"ssec_confidence": _compute_ssec_confidence(line, cwe),
})
except re.error:
pass
return findings
def _compute_ssec_confidence(line: str, cwe: str) -> float:
"""
SSEC confidence scoring — custom algorithm.
Higher confidence when multiple indicators co-occur on the same line.
"""
score = 0.5
line_lower = line.lower()
dangerous_keywords = ["request", "input", "user", "external", "upload", "argv", "environ", "param"]
if any(kw in line_lower for kw in dangerous_keywords):
score += 0.2
validation_keywords = ["sanitize", "escape", "validate", "encode", "filter", "check", "verify"]
if any(kw in line_lower for kw in validation_keywords):
score -= 0.15
if line.strip().startswith(("#", "//", "*")):
score = 0.1
test_indicators = ["test_", "mock_", "fake_", "stub_", "assert", "unittest"]
if any(t in line_lower for t in test_indicators):
score -= 0.2
return round(max(0.1, min(1.0, score)), 3)
def run_ssec_scan(repo_dir: str, focus_files: list[str] = None) -> dict:
"""
SSEC (Semantic Similarity Exploit Chain) scan.
Scans all source files against known exploit patterns.
Returns ranked findings by confidence × severity.
"""
all_findings = []
extensions = ["*.py", "*.js", "*.ts", "*.go", "*.java", "*.rb", "*.c", "*.cpp", "*.h"]
search_files = []
if focus_files:
for f in focus_files:
fpath = os.path.join(repo_dir, f)
if os.path.exists(fpath):
search_files.append(fpath)
else:
for ext in extensions:
found = glob.glob(f"{repo_dir}/**/{ext}", recursive=True)
search_files.extend(found)
search_files = [
f for f in search_files
if "node_modules" not in f
and "site-packages" not in f
and ".tox" not in f
and ".git" not in f
and "vendor/" not in f
]
for fpath in search_files[:200]:
rel = os.path.relpath(fpath, repo_dir)
try:
source = open(fpath, encoding="utf-8", errors="replace").read()
except Exception:
continue
findings = _scan_file_for_patterns(rel, source)
all_findings.extend(findings)
sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
all_findings.sort(
key=lambda x: (sev_rank.get(x["severity"], 0), x["ssec_confidence"]),
reverse=True,
)
unique_cwes = list(set(f["cwe"] for f in all_findings))
critical_count = sum(1 for f in all_findings if f["severity"] == "CRITICAL")
high_count = sum(1 for f in all_findings if f["severity"] == "HIGH")
return {
"total_findings": len(all_findings),
"critical": critical_count,
"high": high_count,
"unique_cwes": unique_cwes,
"top_findings": all_findings[:20],
"files_scanned": len(search_files),
"algorithm": "SSEC_v1",
}
def query_cve_intel(description: str, cwe_hint: str = None) -> dict:
"""
Query NVD for CVEs similar to the given description.
Falls back to local pattern matching if NVD API is unavailable.
"""
cache_key = hashlib.sha256(f"{description}{cwe_hint}".encode()).hexdigest()[:16]
cache_file = os.path.join(CACHE_DIR, f"{cache_key}.json")
if os.path.exists(cache_file):
try:
age = time.time() - os.path.getmtime(cache_file)
if age < 86400 * 7:
return json.load(open(cache_file))
except Exception:
pass
result = _query_nvd_api(description, cwe_hint)
if not result.get("error"):
try:
json.dump(result, open(cache_file, "w"), indent=2)
except Exception:
pass
if result.get("error") or not result.get("cves"):
result = _local_cve_lookup(description, cwe_hint)
return result
def _query_nvd_api(description: str, cwe_hint: str = None) -> dict:
"""Query NVD 2.0 API."""
params = {
"keywordSearch": description[:100],
"resultsPerPage": 5,
"startIndex": 0,
}
if cwe_hint:
params["cweId"] = cwe_hint
headers = {}
if NVD_API_KEY:
headers["apiKey"] = NVD_API_KEY
try:
resp = requests.get(NVD_BASE, params=params, headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
cves = []
for item in data.get("vulnerabilities", [])[:5]:
cve = item.get("cve", {})
metrics = cve.get("metrics", {})
cvss_data = (
metrics.get("cvssMetricV31", [{}])[0].get("cvssData", {})
if metrics.get("cvssMetricV31")
else metrics.get("cvssMetricV30", [{}])[0].get("cvssData", {})
if metrics.get("cvssMetricV30")
else {}
)
descs = cve.get("descriptions", [])
desc_text = next((d["value"] for d in descs if d.get("lang") == "en"), "")
cwes = [
w.get("description", [{}])[0].get("value", "")
for w in cve.get("weaknesses", [])
if w.get("description")
]
cves.append({
"id": cve.get("id", ""),
"description": desc_text[:300],
"severity": cvss_data.get("baseSeverity", "UNKNOWN"),
"cvss_score": cvss_data.get("baseScore", 0.0),
"cwes": cwes,
"published": cve.get("published", ""),
})
return {"cves": cves, "source": "nvd_api", "total": data.get("totalResults", 0)}
return {"error": f"NVD API returned {resp.status_code}", "cves": []}
except Exception as e:
return {"error": str(e), "cves": []}
def _local_cve_lookup(description: str, cwe_hint: str = None) -> dict:
"""
Local CVE pattern database — offline fallback.
Returns well-known CVE examples for common vulnerability classes.
"""
LOCAL_CVE_DB = {
"CWE-89": [
{"id": "CVE-2021-44228-analog", "description": "SQL injection via unsanitized user input in ORM query", "severity": "CRITICAL", "cvss_score": 9.8},
],
"CWE-502": [
{"id": "CVE-2019-20107-analog", "description": "Unsafe deserialization of user-controlled pickle data", "severity": "CRITICAL", "cvss_score": 9.8},
],
"CWE-78": [
{"id": "CVE-2021-3129-analog", "description": "OS command injection via unsanitized shell argument", "severity": "CRITICAL", "cvss_score": 9.8},
],
"CWE-22": [
{"id": "CVE-2018-1000116-analog", "description": "Path traversal via ../ in user-supplied file path", "severity": "HIGH", "cvss_score": 7.5},
],
"CWE-79": [
{"id": "CVE-2022-XXXX-xss", "description": "Reflected XSS via unescaped user input in HTML response", "severity": "HIGH", "cvss_score": 6.1},
],
"CWE-798": [
{"id": "CVE-2021-hardcoded", "description": "Hardcoded credentials in source code", "severity": "HIGH", "cvss_score": 7.5},
],
"CWE-295": [
{"id": "CVE-2021-tls-verify", "description": "TLS certificate verification disabled allowing MITM", "severity": "HIGH", "cvss_score": 7.4},
],
"CWE-347": [
{"id": "CVE-2020-jwt-none", "description": "JWT 'none' algorithm accepted allowing token forgery", "severity": "CRITICAL", "cvss_score": 9.8},
],
}
cve_matches = []
desc_lower = description.lower()
keywords_to_cwe = {
"sql": "CWE-89", "pickle": "CWE-502", "deserializ": "CWE-502",
"command": "CWE-78", "shell": "CWE-78", "path traversal": "CWE-22",
"directory traversal": "CWE-22", "xss": "CWE-79", "cross-site": "CWE-79",
"hardcoded": "CWE-798", "tls": "CWE-295", "ssl": "CWE-295",
"jwt": "CWE-347", "token": "CWE-347",
}
matched_cwe = cwe_hint
if not matched_cwe:
for kw, cwe in keywords_to_cwe.items():
if kw in desc_lower:
matched_cwe = cwe
break
if matched_cwe and matched_cwe in LOCAL_CVE_DB:
cve_matches = LOCAL_CVE_DB[matched_cwe]
return {
"cves": cve_matches,
"source": "local_db",
"matched_cwe": matched_cwe,
"total": len(cve_matches),
}