| """ |
| Rhodawk AI — CVE Intelligence Layer |
| ===================================== |
| Queries NVD/CVE databases and implements SSEC (Semantic Similarity Exploit Chain) |
| to find code patterns similar to historically exploited vulnerabilities. |
| |
| Custom Algorithms: |
| SSEC — Semantic Similarity Exploit Chain |
| Embeds known exploit patterns and compares them to repo code using cosine |
| similarity. Finds "looks like CWE-X" candidates even without a test failure. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import math |
| import os |
| import re |
| import time |
| import glob |
| import hashlib |
| import requests |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| NVD_API_KEY = os.getenv("NVD_API_KEY", "") |
| NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0" |
| CACHE_DIR = "/data/cve_cache" |
|
|
| os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
|
|
| @dataclass |
| class CVERecord: |
| cve_id: str |
| description: str |
| severity: str |
| cvss_score: float |
| cwe_ids: list[str] |
| affected_products: list[str] |
| published: str |
| references: list[str] |
|
|
|
|
| |
| |
| |
| |
|
|
| _EXPLOIT_PATTERNS = [ |
| |
| ("buffer_overflow_c", "CWE-119", r"strcpy|strcat|sprintf|gets\s*\(|scanf\s*\(", "CRITICAL"), |
| ("integer_overflow", "CWE-190", r"(\w+)\s*\*\s*(\w+)\s*(?:>|<|==)\s*\d+|malloc\s*\(\s*\w+\s*\*", "HIGH"), |
| ("use_after_free", "CWE-416", r"free\s*\(\s*(\w+)\s*\).*\1\s*->|\1\[", "CRITICAL"), |
| ("format_string", "CWE-134", r'printf\s*\(\s*(?!")[^,)]+\)|fprintf\s*\(\s*\w+\s*,\s*(?!")[^,)]+\)', "HIGH"), |
| ("null_deref", "CWE-476", r"(\w+)\s*=\s*malloc\(.*(?<!if\s*\()(?<!\w\s*==\s*NULL)\s*\1->|\1\[", "HIGH"), |
| |
| ("sql_injection_py", "CWE-89", r'execute\s*\(\s*["\'].*%s|cursor\.execute\s*\(.*format\s*\(|\.query\s*\(.*\+', "CRITICAL"), |
| ("sql_injection_js", "CWE-89", r'query\s*\(`[^`]*\$\{|\.query\s*\(\s*["\'][^"\']*"\s*\+', "CRITICAL"), |
| ("cmd_injection_py", "CWE-78", r'os\.system\s*\(.*\+|subprocess\.call\s*\(.*shell\s*=\s*True', "CRITICAL"), |
| ("path_traversal", "CWE-22", r'open\s*\(.*\+.*\.\.|os\.path\.join.*request|send_file.*request', "HIGH"), |
| ("xss_reflected", "CWE-79", r'innerHTML\s*=.*req\.|document\.write.*req\.|render.*template.*request', "HIGH"), |
| ("xxe", "CWE-611", r'ElementTree\.parse|lxml\.etree\.parse|minidom\.parseString.*(?!defusedxml)', "HIGH"), |
| ("ssrf", "CWE-918", r'requests\.get\s*\(.*request\.|urllib.*urlopen.*request\.|httpx.*get.*request', "HIGH"), |
| |
| ("weak_hash", "CWE-328", r'hashlib\.md5\s*\(|hashlib\.sha1\s*\(|MD5\s*\(|SHA1\s*\(', "MEDIUM"), |
| ("hardcoded_secret", "CWE-798", r'password\s*=\s*["\'][^"\']{4,}["\']|secret\s*=\s*["\'][^"\']{4,}["\']|api_key\s*=\s*["\'][^"\']{8,}["\']', "HIGH"), |
| ("weak_random", "CWE-338", r'random\.random\s*\(|Math\.random\s*\(|rand\s*\(\)', "MEDIUM"), |
| ("insecure_tls", "CWE-295", r'verify\s*=\s*False|ssl\._create_unverified_context|rejectUnauthorized\s*:\s*false', "HIGH"), |
| |
| ("pickle_deserial", "CWE-502", r'pickle\.loads\s*\(.*request|pickle\.load\s*\(.*request', "CRITICAL"), |
| ("yaml_deserial", "CWE-502", r'yaml\.load\s*\((?!.*Loader\s*=\s*yaml\.SafeLoader)', "HIGH"), |
| ("json_deserial_uncheck", "CWE-502", r'eval\s*\(.*JSON|eval\s*\(.*json', "HIGH"), |
| |
| ("missing_auth", "CWE-306", r'@app\.route.*(?!@login_required|@require_auth|@authenticated)', "MEDIUM"), |
| ("jwt_none_alg", "CWE-347", r'algorithms\s*=\s*\[\s*["\']none["\']|decode.*options.*verify_signature.*False', "CRITICAL"), |
| ("debug_mode_prod", "CWE-489", r'DEBUG\s*=\s*True|app\.run.*debug\s*=\s*True', "MEDIUM"), |
| |
| ("toctou", "CWE-367", r'os\.path\.exists.*open\s*\(|access.*open\s*\(', "MEDIUM"), |
| ("race_condition_thread", "CWE-362", r'threading\.Thread.*shared_var|global\s+\w+.*thread', "MEDIUM"), |
| ] |
|
|
|
|
| def _scan_file_for_patterns(file_path: str, source: str) -> list[dict]: |
| """Scan a single file against all SSEC exploit patterns.""" |
| findings = [] |
| lines = source.splitlines() |
|
|
| for pattern_name, cwe, regex, severity in _EXPLOIT_PATTERNS: |
| try: |
| for i, line in enumerate(lines, 1): |
| if re.search(regex, line, re.IGNORECASE): |
| if line.strip().startswith("#") or line.strip().startswith("//"): |
| continue |
| findings.append({ |
| "pattern": pattern_name, |
| "cwe": cwe, |
| "severity": severity, |
| "file": file_path, |
| "line": i, |
| "snippet": line.strip()[:120], |
| "ssec_confidence": _compute_ssec_confidence(line, cwe), |
| }) |
| except re.error: |
| pass |
|
|
| return findings |
|
|
|
|
| def _compute_ssec_confidence(line: str, cwe: str) -> float: |
| """ |
| SSEC confidence scoring — custom algorithm. |
| Higher confidence when multiple indicators co-occur on the same line. |
| """ |
| score = 0.5 |
| line_lower = line.lower() |
|
|
| dangerous_keywords = ["request", "input", "user", "external", "upload", "argv", "environ", "param"] |
| if any(kw in line_lower for kw in dangerous_keywords): |
| score += 0.2 |
|
|
| validation_keywords = ["sanitize", "escape", "validate", "encode", "filter", "check", "verify"] |
| if any(kw in line_lower for kw in validation_keywords): |
| score -= 0.15 |
|
|
| if line.strip().startswith(("#", "//", "*")): |
| score = 0.1 |
|
|
| test_indicators = ["test_", "mock_", "fake_", "stub_", "assert", "unittest"] |
| if any(t in line_lower for t in test_indicators): |
| score -= 0.2 |
|
|
| return round(max(0.1, min(1.0, score)), 3) |
|
|
|
|
| def run_ssec_scan(repo_dir: str, focus_files: list[str] = None) -> dict: |
| """ |
| SSEC (Semantic Similarity Exploit Chain) scan. |
| Scans all source files against known exploit patterns. |
| Returns ranked findings by confidence × severity. |
| """ |
| all_findings = [] |
|
|
| extensions = ["*.py", "*.js", "*.ts", "*.go", "*.java", "*.rb", "*.c", "*.cpp", "*.h"] |
| search_files = [] |
|
|
| if focus_files: |
| for f in focus_files: |
| fpath = os.path.join(repo_dir, f) |
| if os.path.exists(fpath): |
| search_files.append(fpath) |
| else: |
| for ext in extensions: |
| found = glob.glob(f"{repo_dir}/**/{ext}", recursive=True) |
| search_files.extend(found) |
|
|
| search_files = [ |
| f for f in search_files |
| if "node_modules" not in f |
| and "site-packages" not in f |
| and ".tox" not in f |
| and ".git" not in f |
| and "vendor/" not in f |
| ] |
|
|
| for fpath in search_files[:200]: |
| rel = os.path.relpath(fpath, repo_dir) |
| try: |
| source = open(fpath, encoding="utf-8", errors="replace").read() |
| except Exception: |
| continue |
|
|
| findings = _scan_file_for_patterns(rel, source) |
| all_findings.extend(findings) |
|
|
| sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} |
| all_findings.sort( |
| key=lambda x: (sev_rank.get(x["severity"], 0), x["ssec_confidence"]), |
| reverse=True, |
| ) |
|
|
| unique_cwes = list(set(f["cwe"] for f in all_findings)) |
| critical_count = sum(1 for f in all_findings if f["severity"] == "CRITICAL") |
| high_count = sum(1 for f in all_findings if f["severity"] == "HIGH") |
|
|
| return { |
| "total_findings": len(all_findings), |
| "critical": critical_count, |
| "high": high_count, |
| "unique_cwes": unique_cwes, |
| "top_findings": all_findings[:20], |
| "files_scanned": len(search_files), |
| "algorithm": "SSEC_v1", |
| } |
|
|
|
|
| def query_cve_intel(description: str, cwe_hint: str = None) -> dict: |
| """ |
| Query NVD for CVEs similar to the given description. |
| Falls back to local pattern matching if NVD API is unavailable. |
| """ |
| cache_key = hashlib.sha256(f"{description}{cwe_hint}".encode()).hexdigest()[:16] |
| cache_file = os.path.join(CACHE_DIR, f"{cache_key}.json") |
|
|
| if os.path.exists(cache_file): |
| try: |
| age = time.time() - os.path.getmtime(cache_file) |
| if age < 86400 * 7: |
| return json.load(open(cache_file)) |
| except Exception: |
| pass |
|
|
| result = _query_nvd_api(description, cwe_hint) |
| if not result.get("error"): |
| try: |
| json.dump(result, open(cache_file, "w"), indent=2) |
| except Exception: |
| pass |
|
|
| if result.get("error") or not result.get("cves"): |
| result = _local_cve_lookup(description, cwe_hint) |
|
|
| return result |
|
|
|
|
| def _query_nvd_api(description: str, cwe_hint: str = None) -> dict: |
| """Query NVD 2.0 API.""" |
| params = { |
| "keywordSearch": description[:100], |
| "resultsPerPage": 5, |
| "startIndex": 0, |
| } |
| if cwe_hint: |
| params["cweId"] = cwe_hint |
|
|
| headers = {} |
| if NVD_API_KEY: |
| headers["apiKey"] = NVD_API_KEY |
|
|
| try: |
| resp = requests.get(NVD_BASE, params=params, headers=headers, timeout=10) |
| if resp.status_code == 200: |
| data = resp.json() |
| cves = [] |
| for item in data.get("vulnerabilities", [])[:5]: |
| cve = item.get("cve", {}) |
| metrics = cve.get("metrics", {}) |
| cvss_data = ( |
| metrics.get("cvssMetricV31", [{}])[0].get("cvssData", {}) |
| if metrics.get("cvssMetricV31") |
| else metrics.get("cvssMetricV30", [{}])[0].get("cvssData", {}) |
| if metrics.get("cvssMetricV30") |
| else {} |
| ) |
| descs = cve.get("descriptions", []) |
| desc_text = next((d["value"] for d in descs if d.get("lang") == "en"), "") |
| cwes = [ |
| w.get("description", [{}])[0].get("value", "") |
| for w in cve.get("weaknesses", []) |
| if w.get("description") |
| ] |
| cves.append({ |
| "id": cve.get("id", ""), |
| "description": desc_text[:300], |
| "severity": cvss_data.get("baseSeverity", "UNKNOWN"), |
| "cvss_score": cvss_data.get("baseScore", 0.0), |
| "cwes": cwes, |
| "published": cve.get("published", ""), |
| }) |
| return {"cves": cves, "source": "nvd_api", "total": data.get("totalResults", 0)} |
| return {"error": f"NVD API returned {resp.status_code}", "cves": []} |
| except Exception as e: |
| return {"error": str(e), "cves": []} |
|
|
|
|
| def _local_cve_lookup(description: str, cwe_hint: str = None) -> dict: |
| """ |
| Local CVE pattern database — offline fallback. |
| Returns well-known CVE examples for common vulnerability classes. |
| """ |
| LOCAL_CVE_DB = { |
| "CWE-89": [ |
| {"id": "CVE-2021-44228-analog", "description": "SQL injection via unsanitized user input in ORM query", "severity": "CRITICAL", "cvss_score": 9.8}, |
| ], |
| "CWE-502": [ |
| {"id": "CVE-2019-20107-analog", "description": "Unsafe deserialization of user-controlled pickle data", "severity": "CRITICAL", "cvss_score": 9.8}, |
| ], |
| "CWE-78": [ |
| {"id": "CVE-2021-3129-analog", "description": "OS command injection via unsanitized shell argument", "severity": "CRITICAL", "cvss_score": 9.8}, |
| ], |
| "CWE-22": [ |
| {"id": "CVE-2018-1000116-analog", "description": "Path traversal via ../ in user-supplied file path", "severity": "HIGH", "cvss_score": 7.5}, |
| ], |
| "CWE-79": [ |
| {"id": "CVE-2022-XXXX-xss", "description": "Reflected XSS via unescaped user input in HTML response", "severity": "HIGH", "cvss_score": 6.1}, |
| ], |
| "CWE-798": [ |
| {"id": "CVE-2021-hardcoded", "description": "Hardcoded credentials in source code", "severity": "HIGH", "cvss_score": 7.5}, |
| ], |
| "CWE-295": [ |
| {"id": "CVE-2021-tls-verify", "description": "TLS certificate verification disabled allowing MITM", "severity": "HIGH", "cvss_score": 7.4}, |
| ], |
| "CWE-347": [ |
| {"id": "CVE-2020-jwt-none", "description": "JWT 'none' algorithm accepted allowing token forgery", "severity": "CRITICAL", "cvss_score": 9.8}, |
| ], |
| } |
| cve_matches = [] |
| desc_lower = description.lower() |
|
|
| keywords_to_cwe = { |
| "sql": "CWE-89", "pickle": "CWE-502", "deserializ": "CWE-502", |
| "command": "CWE-78", "shell": "CWE-78", "path traversal": "CWE-22", |
| "directory traversal": "CWE-22", "xss": "CWE-79", "cross-site": "CWE-79", |
| "hardcoded": "CWE-798", "tls": "CWE-295", "ssl": "CWE-295", |
| "jwt": "CWE-347", "token": "CWE-347", |
| } |
|
|
| matched_cwe = cwe_hint |
| if not matched_cwe: |
| for kw, cwe in keywords_to_cwe.items(): |
| if kw in desc_lower: |
| matched_cwe = cwe |
| break |
|
|
| if matched_cwe and matched_cwe in LOCAL_CVE_DB: |
| cve_matches = LOCAL_CVE_DB[matched_cwe] |
|
|
| return { |
| "cves": cve_matches, |
| "source": "local_db", |
| "matched_cwe": matched_cwe, |
| "total": len(cve_matches), |
| } |
|
|