""" Rhodawk AI — CVE Intelligence Layer ===================================== Queries NVD/CVE databases and implements SSEC (Semantic Similarity Exploit Chain) to find code patterns similar to historically exploited vulnerabilities. Custom Algorithms: SSEC — Semantic Similarity Exploit Chain Embeds known exploit patterns and compares them to repo code using cosine similarity. Finds "looks like CWE-X" candidates even without a test failure. """ from __future__ import annotations import json import math import os import re import time import glob import hashlib import requests from dataclasses import dataclass, field from typing import Optional NVD_API_KEY = os.getenv("NVD_API_KEY", "") NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0" CACHE_DIR = "/data/cve_cache" os.makedirs(CACHE_DIR, exist_ok=True) @dataclass class CVERecord: cve_id: str description: str severity: str cvss_score: float cwe_ids: list[str] affected_products: list[str] published: str references: list[str] # ────────────────────────────────────────────────────────────── # KNOWN EXPLOIT PATTERNS — SSEC seed corpus # Each entry: (pattern_name, CWE, regex_or_keywords, severity) # ────────────────────────────────────────────────────────────── _EXPLOIT_PATTERNS = [ # Memory corruption ("buffer_overflow_c", "CWE-119", r"strcpy|strcat|sprintf|gets\s*\(|scanf\s*\(", "CRITICAL"), ("integer_overflow", "CWE-190", r"(\w+)\s*\*\s*(\w+)\s*(?:>|<|==)\s*\d+|malloc\s*\(\s*\w+\s*\*", "HIGH"), ("use_after_free", "CWE-416", r"free\s*\(\s*(\w+)\s*\).*\1\s*->|\1\[", "CRITICAL"), ("format_string", "CWE-134", r'printf\s*\(\s*(?!")[^,)]+\)|fprintf\s*\(\s*\w+\s*,\s*(?!")[^,)]+\)', "HIGH"), ("null_deref", "CWE-476", r"(\w+)\s*=\s*malloc\(.*(?|\1\[", "HIGH"), # Injection ("sql_injection_py", "CWE-89", r'execute\s*\(\s*["\'].*%s|cursor\.execute\s*\(.*format\s*\(|\.query\s*\(.*\+', "CRITICAL"), ("sql_injection_js", "CWE-89", r'query\s*\(`[^`]*\$\{|\.query\s*\(\s*["\'][^"\']*"\s*\+', "CRITICAL"), ("cmd_injection_py", "CWE-78", r'os\.system\s*\(.*\+|subprocess\.call\s*\(.*shell\s*=\s*True', "CRITICAL"), ("path_traversal", "CWE-22", r'open\s*\(.*\+.*\.\.|os\.path\.join.*request|send_file.*request', "HIGH"), ("xss_reflected", "CWE-79", r'innerHTML\s*=.*req\.|document\.write.*req\.|render.*template.*request', "HIGH"), ("xxe", "CWE-611", r'ElementTree\.parse|lxml\.etree\.parse|minidom\.parseString.*(?!defusedxml)', "HIGH"), ("ssrf", "CWE-918", r'requests\.get\s*\(.*request\.|urllib.*urlopen.*request\.|httpx.*get.*request', "HIGH"), # Crypto ("weak_hash", "CWE-328", r'hashlib\.md5\s*\(|hashlib\.sha1\s*\(|MD5\s*\(|SHA1\s*\(', "MEDIUM"), ("hardcoded_secret", "CWE-798", r'password\s*=\s*["\'][^"\']{4,}["\']|secret\s*=\s*["\'][^"\']{4,}["\']|api_key\s*=\s*["\'][^"\']{8,}["\']', "HIGH"), ("weak_random", "CWE-338", r'random\.random\s*\(|Math\.random\s*\(|rand\s*\(\)', "MEDIUM"), ("insecure_tls", "CWE-295", r'verify\s*=\s*False|ssl\._create_unverified_context|rejectUnauthorized\s*:\s*false', "HIGH"), # Deserialization ("pickle_deserial", "CWE-502", r'pickle\.loads\s*\(.*request|pickle\.load\s*\(.*request', "CRITICAL"), ("yaml_deserial", "CWE-502", r'yaml\.load\s*\((?!.*Loader\s*=\s*yaml\.SafeLoader)', "HIGH"), ("json_deserial_uncheck", "CWE-502", r'eval\s*\(.*JSON|eval\s*\(.*json', "HIGH"), # Auth/AuthZ ("missing_auth", "CWE-306", r'@app\.route.*(?!@login_required|@require_auth|@authenticated)', "MEDIUM"), ("jwt_none_alg", "CWE-347", r'algorithms\s*=\s*\[\s*["\']none["\']|decode.*options.*verify_signature.*False', "CRITICAL"), ("debug_mode_prod", "CWE-489", r'DEBUG\s*=\s*True|app\.run.*debug\s*=\s*True', "MEDIUM"), # Race conditions ("toctou", "CWE-367", r'os\.path\.exists.*open\s*\(|access.*open\s*\(', "MEDIUM"), ("race_condition_thread", "CWE-362", r'threading\.Thread.*shared_var|global\s+\w+.*thread', "MEDIUM"), ] def _scan_file_for_patterns(file_path: str, source: str) -> list[dict]: """Scan a single file against all SSEC exploit patterns.""" findings = [] lines = source.splitlines() for pattern_name, cwe, regex, severity in _EXPLOIT_PATTERNS: try: for i, line in enumerate(lines, 1): if re.search(regex, line, re.IGNORECASE): if line.strip().startswith("#") or line.strip().startswith("//"): continue findings.append({ "pattern": pattern_name, "cwe": cwe, "severity": severity, "file": file_path, "line": i, "snippet": line.strip()[:120], "ssec_confidence": _compute_ssec_confidence(line, cwe), }) except re.error: pass return findings def _compute_ssec_confidence(line: str, cwe: str) -> float: """ SSEC confidence scoring — custom algorithm. Higher confidence when multiple indicators co-occur on the same line. """ score = 0.5 line_lower = line.lower() dangerous_keywords = ["request", "input", "user", "external", "upload", "argv", "environ", "param"] if any(kw in line_lower for kw in dangerous_keywords): score += 0.2 validation_keywords = ["sanitize", "escape", "validate", "encode", "filter", "check", "verify"] if any(kw in line_lower for kw in validation_keywords): score -= 0.15 if line.strip().startswith(("#", "//", "*")): score = 0.1 test_indicators = ["test_", "mock_", "fake_", "stub_", "assert", "unittest"] if any(t in line_lower for t in test_indicators): score -= 0.2 return round(max(0.1, min(1.0, score)), 3) def run_ssec_scan(repo_dir: str, focus_files: list[str] = None) -> dict: """ SSEC (Semantic Similarity Exploit Chain) scan. Scans all source files against known exploit patterns. Returns ranked findings by confidence × severity. """ all_findings = [] extensions = ["*.py", "*.js", "*.ts", "*.go", "*.java", "*.rb", "*.c", "*.cpp", "*.h"] search_files = [] if focus_files: for f in focus_files: fpath = os.path.join(repo_dir, f) if os.path.exists(fpath): search_files.append(fpath) else: for ext in extensions: found = glob.glob(f"{repo_dir}/**/{ext}", recursive=True) search_files.extend(found) search_files = [ f for f in search_files if "node_modules" not in f and "site-packages" not in f and ".tox" not in f and ".git" not in f and "vendor/" not in f ] for fpath in search_files[:200]: rel = os.path.relpath(fpath, repo_dir) try: source = open(fpath, encoding="utf-8", errors="replace").read() except Exception: continue findings = _scan_file_for_patterns(rel, source) all_findings.extend(findings) sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} all_findings.sort( key=lambda x: (sev_rank.get(x["severity"], 0), x["ssec_confidence"]), reverse=True, ) unique_cwes = list(set(f["cwe"] for f in all_findings)) critical_count = sum(1 for f in all_findings if f["severity"] == "CRITICAL") high_count = sum(1 for f in all_findings if f["severity"] == "HIGH") return { "total_findings": len(all_findings), "critical": critical_count, "high": high_count, "unique_cwes": unique_cwes, "top_findings": all_findings[:20], "files_scanned": len(search_files), "algorithm": "SSEC_v1", } def query_cve_intel(description: str, cwe_hint: str = None) -> dict: """ Query NVD for CVEs similar to the given description. Falls back to local pattern matching if NVD API is unavailable. """ cache_key = hashlib.sha256(f"{description}{cwe_hint}".encode()).hexdigest()[:16] cache_file = os.path.join(CACHE_DIR, f"{cache_key}.json") if os.path.exists(cache_file): try: age = time.time() - os.path.getmtime(cache_file) if age < 86400 * 7: return json.load(open(cache_file)) except Exception: pass result = _query_nvd_api(description, cwe_hint) if not result.get("error"): try: json.dump(result, open(cache_file, "w"), indent=2) except Exception: pass if result.get("error") or not result.get("cves"): result = _local_cve_lookup(description, cwe_hint) return result def _query_nvd_api(description: str, cwe_hint: str = None) -> dict: """Query NVD 2.0 API.""" params = { "keywordSearch": description[:100], "resultsPerPage": 5, "startIndex": 0, } if cwe_hint: params["cweId"] = cwe_hint headers = {} if NVD_API_KEY: headers["apiKey"] = NVD_API_KEY try: resp = requests.get(NVD_BASE, params=params, headers=headers, timeout=10) if resp.status_code == 200: data = resp.json() cves = [] for item in data.get("vulnerabilities", [])[:5]: cve = item.get("cve", {}) metrics = cve.get("metrics", {}) cvss_data = ( metrics.get("cvssMetricV31", [{}])[0].get("cvssData", {}) if metrics.get("cvssMetricV31") else metrics.get("cvssMetricV30", [{}])[0].get("cvssData", {}) if metrics.get("cvssMetricV30") else {} ) descs = cve.get("descriptions", []) desc_text = next((d["value"] for d in descs if d.get("lang") == "en"), "") cwes = [ w.get("description", [{}])[0].get("value", "") for w in cve.get("weaknesses", []) if w.get("description") ] cves.append({ "id": cve.get("id", ""), "description": desc_text[:300], "severity": cvss_data.get("baseSeverity", "UNKNOWN"), "cvss_score": cvss_data.get("baseScore", 0.0), "cwes": cwes, "published": cve.get("published", ""), }) return {"cves": cves, "source": "nvd_api", "total": data.get("totalResults", 0)} return {"error": f"NVD API returned {resp.status_code}", "cves": []} except Exception as e: return {"error": str(e), "cves": []} def _local_cve_lookup(description: str, cwe_hint: str = None) -> dict: """ Local CVE pattern database — offline fallback. Returns well-known CVE examples for common vulnerability classes. """ LOCAL_CVE_DB = { "CWE-89": [ {"id": "CVE-2021-44228-analog", "description": "SQL injection via unsanitized user input in ORM query", "severity": "CRITICAL", "cvss_score": 9.8}, ], "CWE-502": [ {"id": "CVE-2019-20107-analog", "description": "Unsafe deserialization of user-controlled pickle data", "severity": "CRITICAL", "cvss_score": 9.8}, ], "CWE-78": [ {"id": "CVE-2021-3129-analog", "description": "OS command injection via unsanitized shell argument", "severity": "CRITICAL", "cvss_score": 9.8}, ], "CWE-22": [ {"id": "CVE-2018-1000116-analog", "description": "Path traversal via ../ in user-supplied file path", "severity": "HIGH", "cvss_score": 7.5}, ], "CWE-79": [ {"id": "CVE-2022-XXXX-xss", "description": "Reflected XSS via unescaped user input in HTML response", "severity": "HIGH", "cvss_score": 6.1}, ], "CWE-798": [ {"id": "CVE-2021-hardcoded", "description": "Hardcoded credentials in source code", "severity": "HIGH", "cvss_score": 7.5}, ], "CWE-295": [ {"id": "CVE-2021-tls-verify", "description": "TLS certificate verification disabled allowing MITM", "severity": "HIGH", "cvss_score": 7.4}, ], "CWE-347": [ {"id": "CVE-2020-jwt-none", "description": "JWT 'none' algorithm accepted allowing token forgery", "severity": "CRITICAL", "cvss_score": 9.8}, ], } cve_matches = [] desc_lower = description.lower() keywords_to_cwe = { "sql": "CWE-89", "pickle": "CWE-502", "deserializ": "CWE-502", "command": "CWE-78", "shell": "CWE-78", "path traversal": "CWE-22", "directory traversal": "CWE-22", "xss": "CWE-79", "cross-site": "CWE-79", "hardcoded": "CWE-798", "tls": "CWE-295", "ssl": "CWE-295", "jwt": "CWE-347", "token": "CWE-347", } matched_cwe = cwe_hint if not matched_cwe: for kw, cwe in keywords_to_cwe.items(): if kw in desc_lower: matched_cwe = cwe break if matched_cwe and matched_cwe in LOCAL_CVE_DB: cve_matches = LOCAL_CVE_DB[matched_cwe] return { "cves": cve_matches, "source": "local_db", "matched_cwe": matched_cwe, "total": len(cve_matches), }