""" Rhodawk AI — Exploit Primitive Reasoner ========================================= Given a crash or vulnerability candidate, reasons about: 1. Exploitability class (overflow, UAF, injection, race, crypto, logic) 2. Control flow impact (can attacker redirect execution?) 3. Data flow impact (can attacker read/write arbitrary memory?) 4. Proof-of-Concept generation (minimal triggerable input) 5. Severity and bounty tier estimate Uses DeepSeek-R1 (reasoning model) for deep exploit chain analysis. Never auto-submits — all output goes to the disclosure pipeline for human review. """ from __future__ import annotations import hashlib import json import os import time import requests from dataclasses import dataclass, field from typing import Optional OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") EXPLOIT_MODEL = os.getenv("HERMES_MODEL", "deepseek/deepseek-r1:free") OPENROUTER_BASE = "https://openrouter.ai/api/v1" @dataclass class ExploitAnalysis: vulnerability_id: str exploit_class: str # memory_corruption | injection | logic | crypto | race | info_disclosure control_flow_impact: str # full_control | partial_control | none data_impact: str # arbitrary_read_write | arbitrary_read | arbitrary_write | none auth_bypass_possible: bool remote_exploitable: bool exploit_complexity: str # LOW | MEDIUM | HIGH estimated_cvss: float bounty_tier: str # P1 ($10k+) | P2 ($5k) | P3 ($2k) | P4 (<$1k) proof_of_concept: str attack_scenario: str mitigations_present: list[str] confidence: float reasoning: str _EXPLOIT_SYSTEM = """You are a world-class exploit developer and vulnerability researcher. Given a crash, vulnerability description, or suspicious code pattern, analyze its exploitability with extreme precision. Think like an attacker trying to maximize impact. Consider: - Can the attacker control the input that triggers this? - Does this give code execution, information disclosure, or denial of service? - What mitigations are in place (ASLR, stack canaries, bounds checking, authentication)? - What is the minimal proof-of-concept that demonstrates the issue? - What is the realistic attack scenario (network, local, authenticated, unauthenticated)? Be honest about uncertainty. If you cannot determine exploitability, say so clearly. A false positive wastes a researcher's time and damages credibility. Respond in this exact JSON format: { "exploit_class": "memory_corruption|injection|logic|crypto|race|info_disclosure|denial_of_service", "control_flow_impact": "full_control|partial_control|none", "data_impact": "arbitrary_read_write|arbitrary_read|arbitrary_write|none", "auth_bypass_possible": true/false, "remote_exploitable": true/false, "exploit_complexity": "LOW|MEDIUM|HIGH", "estimated_cvss": 0.0-10.0, "bounty_tier": "P1|P2|P3|P4", "proof_of_concept": "minimal code or input that triggers the issue", "attack_scenario": "step by step how an attacker would exploit this", "mitigations_present": ["mitigation1", "mitigation2"], "confidence": 0.0-1.0, "reasoning": "your full chain of reasoning about exploitability" } """ _CVSS_TO_TIER = { (9.0, 10.0): "P1", (7.0, 8.9): "P2", (4.0, 6.9): "P3", (0.0, 3.9): "P4", } def _estimate_bounty_tier(cvss: float) -> str: for (lo, hi), tier in _CVSS_TO_TIER.items(): if lo <= cvss <= hi: return tier return "P4" def _call_exploit_reasoner(prompt: str) -> dict: if not OPENROUTER_API_KEY: return { "exploit_class": "unknown", "control_flow_impact": "unknown", "data_impact": "unknown", "auth_bypass_possible": False, "remote_exploitable": False, "exploit_complexity": "HIGH", "estimated_cvss": 0.0, "bounty_tier": "P4", "proof_of_concept": "OPENROUTER_API_KEY not set — manual analysis required", "attack_scenario": "", "mitigations_present": [], "confidence": 0.0, "reasoning": "API key not available", } headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": "https://rhodawk.ai", "X-Title": "Rhodawk Exploit Reasoner", } payload = { "model": EXPLOIT_MODEL, "messages": [ {"role": "system", "content": _EXPLOIT_SYSTEM}, {"role": "user", "content": prompt}, ], "temperature": 0.1, "max_tokens": 2000, "response_format": {"type": "json_object"}, } try: resp = requests.post( f"{OPENROUTER_BASE}/chat/completions", headers=headers, json=payload, timeout=120, ) resp.raise_for_status() content = resp.json()["choices"][0]["message"]["content"] return json.loads(content) except Exception as e: return { "exploit_class": "analysis_failed", "control_flow_impact": "unknown", "data_impact": "unknown", "auth_bypass_possible": False, "remote_exploitable": False, "exploit_complexity": "HIGH", "estimated_cvss": 0.0, "bounty_tier": "P4", "proof_of_concept": f"Analysis failed: {e}", "attack_scenario": "", "mitigations_present": [], "confidence": 0.0, "reasoning": str(e), } def reason_exploitability( crash_input: str, crash_output: str, file_path: str, vuln_type: str, source_context: str = "", ) -> dict: """ Main entry point for exploit primitive reasoning. Returns a detailed exploitability analysis. """ vuln_id = hashlib.sha256(f"{file_path}{crash_input}{time.time()}".encode()).hexdigest()[:12] prompt = ( f"VULNERABILITY TYPE: {vuln_type}\n" f"FILE: {file_path}\n\n" f"CRASH INPUT / TRIGGER:\n```\n{crash_input[:1000]}\n```\n\n" f"CRASH OUTPUT / STACK TRACE:\n```\n{crash_output[:2000]}\n```\n\n" ) if source_context: prompt += f"SOURCE CONTEXT:\n```\n{source_context[:1500]}\n```\n\n" prompt += "Analyze this vulnerability's exploitability in full detail." print(f"[EXPLOIT] Reasoning about {vuln_type} in {file_path} (ID: {vuln_id})") raw = _call_exploit_reasoner(prompt) cvss = float(raw.get("estimated_cvss", 0.0)) tier = raw.get("bounty_tier") or _estimate_bounty_tier(cvss) analysis = ExploitAnalysis( vulnerability_id=vuln_id, exploit_class=raw.get("exploit_class", "unknown"), control_flow_impact=raw.get("control_flow_impact", "none"), data_impact=raw.get("data_impact", "none"), auth_bypass_possible=bool(raw.get("auth_bypass_possible", False)), remote_exploitable=bool(raw.get("remote_exploitable", False)), exploit_complexity=raw.get("exploit_complexity", "HIGH"), estimated_cvss=round(cvss, 1), bounty_tier=tier, proof_of_concept=raw.get("proof_of_concept", ""), attack_scenario=raw.get("attack_scenario", ""), mitigations_present=raw.get("mitigations_present", []), confidence=float(raw.get("confidence", 0.0)), reasoning=raw.get("reasoning", "")[:1000], ) print(f"[EXPLOIT] {vuln_id}: CVSS={analysis.estimated_cvss}, Tier={analysis.bounty_tier}, " f"Confidence={analysis.confidence}") return { "vulnerability_id": analysis.vulnerability_id, "exploit_class": analysis.exploit_class, "control_flow_impact": analysis.control_flow_impact, "data_impact": analysis.data_impact, "auth_bypass_possible": analysis.auth_bypass_possible, "remote_exploitable": analysis.remote_exploitable, "exploit_complexity": analysis.exploit_complexity, "estimated_cvss": analysis.estimated_cvss, "bounty_tier": analysis.bounty_tier, "proof_of_concept": analysis.proof_of_concept, "attack_scenario": analysis.attack_scenario, "mitigations_present": analysis.mitigations_present, "confidence": analysis.confidence, "reasoning": analysis.reasoning, "disclosure_ready": analysis.confidence >= 0.7 and analysis.estimated_cvss >= 4.0, "requires_human_review": True, } def generate_cve_draft( finding_title: str, finding_description: str, exploit_analysis: dict, affected_repo: str, affected_versions: str = "unspecified", ) -> dict: """ Generate a CVE draft report ready for human review and submission. Output format follows CVE JSON 5.0 schema. """ cvss = exploit_analysis.get("estimated_cvss", 0.0) tier = exploit_analysis.get("bounty_tier", "P4") exploit_class = exploit_analysis.get("exploit_class", "unknown") cwe_map = { "memory_corruption": "CWE-119", "injection": "CWE-74", "logic": "CWE-840", "crypto": "CWE-310", "race": "CWE-362", "info_disclosure": "CWE-200", "denial_of_service": "CWE-400", } cwe = cwe_map.get(exploit_class, "CWE-UNKNOWN") severity = "CRITICAL" if cvss >= 9.0 else "HIGH" if cvss >= 7.0 else "MEDIUM" if cvss >= 4.0 else "LOW" draft = { "dataType": "CVE_RECORD", "dataVersion": "5.0", "cveMetadata": { "state": "DRAFT", "assignerOrgId": "rhodawk-ai", "dateReserved": time.strftime("%Y-%m-%dT%H:%M:%S.000Z"), }, "containers": { "cna": { "title": finding_title, "descriptions": [ { "lang": "en", "value": finding_description, } ], "affected": [ { "repo": f"https://github.com/{affected_repo}", "versions": [{"version": affected_versions, "status": "affected"}], } ], "problemTypes": [ {"descriptions": [{"type": "CWE", "cweId": cwe, "lang": "en"}]} ], "metrics": [ { "cvssV3_1": { "version": "3.1", "baseScore": cvss, "baseSeverity": severity, } } ], "references": [ {"url": f"https://github.com/{affected_repo}", "name": "Repository"} ], "x_rhodawk_metadata": { "bounty_tier": tier, "exploit_class": exploit_class, "disclosure_status": "PENDING_HUMAN_APPROVAL", "generated_by": "Rhodawk AI Security Research Engine", "requires_human_verification": True, "analyst_notes": exploit_analysis.get("reasoning", "")[:500], }, } }, } return { "cve_draft": draft, "severity": severity, "cvss": cvss, "bounty_tier": tier, "ready_for_human_review": True, "auto_submit": False, }