Architect8999 commited on
Commit
ebb0f4d
·
verified ·
1 Parent(s): 1fbcec2

Upload red_team_fuzzer.py

Browse files
Files changed (1) hide show
  1. red_team_fuzzer.py +104 -75
red_team_fuzzer.py CHANGED
@@ -48,6 +48,7 @@ import hashlib
48
  import json
49
  import os
50
  import re
 
51
  import signal
52
  import subprocess
53
  import sys
@@ -57,11 +58,14 @@ import threading
57
  import time
58
  from dataclasses import dataclass, field
59
  from pathlib import Path
60
- from typing import Callable, Optional
61
 
62
  import requests
63
  from tenacity import retry, stop_after_attempt, wait_exponential
64
 
 
 
 
65
  # ──────────────────────────────────────────────────────────────
66
  # CONFIGURATION & SECRETS
67
  # ──────────────────────────────────────────────────────────────
@@ -119,6 +123,63 @@ def get_red_team_logs(n: int = 100) -> str:
119
  return "\n".join(_rt_logs[-n:])
120
 
121
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  # ──────────────────────────────────────────────────────────────
123
  # DATA STRUCTURES
124
  # ──────────────────────────────────────────────────────────────
@@ -215,10 +276,6 @@ def _compute_cyclomatic_complexity(source: str) -> float:
215
  pass
216
 
217
  # Fallback: count decision points
218
- branch_keywords = {
219
- "if", "elif", "for", "while", "except", "with",
220
- "and", "or", "not", "assert",
221
- }
222
  score = 1.0
223
  try:
224
  tree = ast.parse(source)
@@ -409,8 +466,6 @@ def analyze_repository_ast(repo_dir: str) -> list[FuzzTarget]:
409
  rte_log(f"Scanning AST of repository: {repo_dir}", "AST")
410
 
411
  targets: list[FuzzTarget] = []
412
- source_dirs = ["src", "lib", "core", "app", "utils", "engine", "api"]
413
-
414
  candidate_files: list[Path] = []
415
  repo_path = Path(repo_dir)
416
 
@@ -440,20 +495,17 @@ def analyze_repository_ast(repo_dir: str) -> list[FuzzTarget]:
440
  rte_log(f"SyntaxError in {rel_path}: {e}", "WARN")
441
  continue
442
 
443
- # Extract module-level source lines for function slicing
444
  source_lines = source.splitlines()
445
 
446
  for node in ast.walk(tree):
447
  if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
448
  continue
449
- # Skip private/dunder/tiny functions
450
  name = node.name
451
  if name.startswith("__") and name.endswith("__"):
452
  continue
453
  if not node.args.args and not node.args.vararg:
454
  continue # No arguments = nothing to fuzz
455
 
456
- # Extract function source
457
  try:
458
  fn_start = node.lineno - 1
459
  fn_end = node.end_lineno if hasattr(node, "end_lineno") else fn_start + 30
@@ -464,7 +516,6 @@ def analyze_repository_ast(repo_dir: str) -> list[FuzzTarget]:
464
  if len(fn_source.strip()) < 20:
465
  continue
466
 
467
- # Build signature string
468
  try:
469
  sig = ast.unparse(node) if hasattr(ast, "unparse") else name
470
  sig = sig.split("\n")[0].rstrip(":")
@@ -473,17 +524,11 @@ def analyze_repository_ast(repo_dir: str) -> list[FuzzTarget]:
473
  except Exception:
474
  sig = f"def {name}(...)"
475
 
476
- # Extract docstring
477
  docstring = ast.get_docstring(node) or ""
478
-
479
- # Extract arg types and return type
480
  arg_types = _extract_arg_types(node)
481
  return_type = _extract_return_type(node)
482
-
483
- # Compute complexity on the function slice only
484
  complexity = _compute_cyclomatic_complexity(fn_source)
485
 
486
- # Feature flags
487
  has_loops = any(
488
  isinstance(n, (ast.For, ast.While, ast.AsyncFor))
489
  for n in ast.walk(node)
@@ -526,15 +571,17 @@ def analyze_repository_ast(repo_dir: str) -> list[FuzzTarget]:
526
  attack_rationale=rationale,
527
  ))
528
 
529
- # Sort by priority descending
530
  targets.sort(key=lambda t: t.attack_priority, reverse=True)
531
 
532
- rte_log(
533
- f"AST analysis complete: {len(targets)} attack targets ranked. "
534
- f"Top target: {targets[0].profile.function_name if targets else 'none'} "
535
- f"(score={targets[0].attack_priority:.3f})" if targets else "No targets found.",
536
- "AST"
537
- )
 
 
 
538
 
539
  return targets[:MAX_TARGETS_PER_RUN]
540
 
@@ -666,16 +713,17 @@ def _clean_llm_test_output(raw: str) -> str:
666
  Strip markdown fences and extract raw Python from LLM response.
667
  The LLM is instructed not to use markdown, but be defensive.
668
  """
669
- # Remove ```python ... ``` blocks
670
  raw = re.sub(r"```(?:python)?\s*\n?", "", raw)
671
  raw = re.sub(r"```\s*$", "", raw, flags=re.MULTILINE)
672
- # Remove leading/trailing commentary lines that aren't Python
673
  lines = raw.splitlines()
674
  code_lines = []
675
  in_code = False
676
  for line in lines:
677
  stripped = line.strip()
678
- if stripped.startswith("#") or stripped.startswith("import") or stripped.startswith("from") or stripped.startswith("def ") or stripped.startswith("@") or stripped.startswith(" ") or stripped == "" or in_code:
 
 
 
679
  in_code = True
680
  code_lines.append(line)
681
  elif in_code:
@@ -707,7 +755,6 @@ def synthesize_pbt(
707
  raw_response = _call_red_team_llm(_RED_TEAM_SYSTEM_PROMPT, user_prompt, model)
708
  except Exception as e:
709
  rte_log(f"LLM call failed for {target.profile.function_name}: {e}", "WARN")
710
- # Fallback to strong model on failure
711
  if not use_strong_model:
712
  try:
713
  raw_response = _call_red_team_llm(
@@ -721,7 +768,6 @@ def synthesize_pbt(
721
 
722
  test_code = _clean_llm_test_output(raw_response)
723
 
724
- # Validate it looks like Python with hypothesis
725
  if "from hypothesis" not in test_code and "import hypothesis" not in test_code:
726
  rte_log(
727
  f"LLM output for {target.profile.function_name} doesn't contain hypothesis imports — retrying",
@@ -730,20 +776,17 @@ def synthesize_pbt(
730
  return None
731
 
732
  if "def test_" not in test_code:
733
- rte_log(f"LLM output missing test function — retrying", "WARN")
734
  return None
735
 
736
- # Extract test function name
737
  fn_match = re.search(r"def (test_\w+)\(", test_code)
738
  test_fn_name = fn_match.group(1) if fn_match else "test_invariant"
739
 
740
- # Extract invariant description from docstring or comment
741
  inv_match = re.search(r'"""([^"]{10,200}?)"""', test_code)
742
  if not inv_match:
743
  inv_match = re.search(r"#\s*(.{10,120})", test_code)
744
  invariant_desc = inv_match.group(1).strip() if inv_match else "property invariant"
745
 
746
- # Extract hypothesis strategy
747
  strategy_match = re.search(r"@given\((.{5,200}?)\)", test_code)
748
  strategy = strategy_match.group(1) if strategy_match else "unknown"
749
 
@@ -767,11 +810,15 @@ def synthesize_pbt(
767
  # SECTION 3: DETERMINISTIC FUZZING LOOP
768
  # ──────────────────────────────────────────────────────────────
769
 
770
- def _install_hypothesis_if_needed(pytest_bin: str, repo_dir: str) -> bool:
771
- """Ensure hypothesis is installed in the target venv."""
 
 
 
 
772
  try:
773
  check = subprocess.run(
774
- [pytest_bin.replace("pytest", "python"), "-c", "import hypothesis"],
775
  capture_output=True, timeout=15, cwd=repo_dir,
776
  )
777
  if check.returncode == 0:
@@ -779,9 +826,9 @@ def _install_hypothesis_if_needed(pytest_bin: str, repo_dir: str) -> bool:
779
  except Exception:
780
  pass
781
 
782
- rte_log("Installing hypothesis into target venv...", "FUZZ")
 
783
  try:
784
- pip_bin = pytest_bin.replace("pytest", "pip")
785
  result = subprocess.run(
786
  [pip_bin, "install", "hypothesis", "--quiet"],
787
  capture_output=True, timeout=120, cwd=repo_dir,
@@ -846,7 +893,6 @@ def _extract_falsifying_example(output: str) -> str:
846
  if m:
847
  return m.group(1).strip()[:500]
848
 
849
- # Fallback: extract any exception line
850
  for line in output.splitlines():
851
  if "Error" in line or "assert" in line.lower():
852
  return line.strip()[:300]
@@ -870,7 +916,7 @@ def _extract_crash_type(output: str) -> str:
870
  return "type_error"
871
  if "ValueError" in output:
872
  return "value_error"
873
- if "MemoryError" in output or "MemoryError" in output:
874
  return "memory_exhaustion"
875
  if "FAILED" in output:
876
  return "assertion"
@@ -885,7 +931,6 @@ def _extract_survived_inputs(output: str) -> list[str]:
885
  survived = []
886
  for m in re.finditer(r"Trying example.*?\((.+?)\)", output):
887
  survived.append(m.group(1)[:100])
888
- # Also include any explicit example lines
889
  for m in re.finditer(r"explicit example.*?\((.+?)\)", output, re.IGNORECASE):
890
  survived.append(m.group(1)[:100])
891
  return survived[:20]
@@ -895,7 +940,7 @@ def run_fuzzing_loop(
895
  pbt: GeneratedPBT,
896
  target: FuzzTarget,
897
  repo_dir: str,
898
- pytest_bin: str,
899
  ) -> tuple[bool, str, str]:
900
  """
901
  Execute the generated PBT via subprocess with hypothesis aggressive settings.
@@ -905,8 +950,8 @@ def run_fuzzing_loop(
905
 
906
  Security: shell=False enforced, secrets stripped from env, SIGKILL on timeout.
907
  """
908
- # Write test file
909
  test_file = _write_pbt_to_file(pbt, target, repo_dir)
 
910
 
911
  rte_log(
912
  f"Fuzzing: {target.profile.function_name} | "
@@ -915,7 +960,6 @@ def run_fuzzing_loop(
915
  "FUZZ"
916
  )
917
 
918
- # Build environment — secrets stripped, hypothesis settings injected
919
  env = os.environ.copy()
920
  for secret_key in [
921
  "OPENROUTER_API_KEY", "GITHUB_TOKEN", "GITHUB_PERSONAL_ACCESS_TOKEN",
@@ -923,10 +967,8 @@ def run_fuzzing_loop(
923
  ]:
924
  env.pop(secret_key, None)
925
 
926
- # Hypothesis configuration via env (overrides @settings decorator)
927
  env["HYPOTHESIS_MAX_EXAMPLES"] = str(FUZZ_MAX_EXAMPLES)
928
  env["HYPOTHESIS_VERBOSITY"] = "verbose"
929
- # Deterministic but varied seed per CEGIS round
930
  seed = (hash(target.profile.function_name) + pbt.cegis_round * 7919) % (2**31)
931
  env["HYPOTHESIS_SEED"] = str(abs(seed))
932
 
@@ -937,7 +979,7 @@ def run_fuzzing_loop(
937
  "--tb=long",
938
  "--no-header",
939
  f"--hypothesis-seed={abs(seed)}",
940
- "-x", # Stop at first failure
941
  ]
942
 
943
  proc = None
@@ -990,7 +1032,6 @@ def run_fuzzing_loop(
990
  rte_log(f"Fuzzing subprocess error for {target.profile.function_name}: {e}", "FAIL")
991
  return False, str(e), ""
992
  finally:
993
- # Clean up test file if no crash (keep if crash for audit trail)
994
  if proc and proc.returncode == 0:
995
  try:
996
  os.unlink(test_file)
@@ -1013,8 +1054,6 @@ def _build_synthetic_failing_test(crash: CrashPayload, repo_dir: str) -> str:
1013
  p = crash.target.profile
1014
  example = crash.falsifying_example
1015
 
1016
- # Parse the falsifying example into argument assignments
1017
- # hypothesis formats it as: x=42, y=-1, s='hello'
1018
  arg_setup_lines = []
1019
  example_clean = example.strip().rstrip(")")
1020
  for part in re.split(r",\s*(?=[a-zA-Z_]\w*=)", example_clean):
@@ -1023,9 +1062,8 @@ def _build_synthetic_failing_test(crash: CrashPayload, repo_dir: str) -> str:
1023
  arg_setup_lines.append(f" {part}")
1024
 
1025
  if not arg_setup_lines:
1026
- # Fallback: use the raw example as a comment
1027
  arg_setup_lines = [f" # Falsifying example: {example}"]
1028
- arg_call = ", ".join(f"None" for _ in p.arg_types)
1029
  else:
1030
  arg_call = ", ".join(
1031
  part.strip().split("=")[0] for part in arg_setup_lines if "=" in part
@@ -1082,8 +1120,6 @@ def _build_synthetic_failing_test(crash: CrashPayload, repo_dir: str) -> str:
1082
  # Blue Team: fix the implementation so this assertion holds for all inputs
1083
  try:
1084
  result = {p.function_name}({arg_call})
1085
- # If the crash was an assertion in the PBT, re-check the invariant
1086
- # The Blue Team must make this deterministic test pass
1087
  assert result is not None or result is None, (
1088
  f"Function returned unexpected result: {{result!r}}"
1089
  )
@@ -1115,7 +1151,6 @@ def package_crash_for_blue_team(
1115
  crash_hash = hashlib.sha256(crash_raw.encode()).hexdigest()[:16]
1116
  crash_type = _extract_crash_type(crash_output)
1117
 
1118
- # Write synthetic deterministic failing test
1119
  fn_safe = target.profile.function_name.replace("-", "_")
1120
  synthetic_filename = f"test_rt_zero_day_{fn_safe}_{crash_hash}.py"
1121
  synthetic_path = os.path.join(RED_TEAM_DIR, synthetic_filename)
@@ -1151,7 +1186,7 @@ def package_crash_for_blue_team(
1151
  def handoff_to_blue_team(
1152
  crash: CrashPayload,
1153
  repo_dir: str,
1154
- pytest_bin: str,
1155
  mcp_config_path: str,
1156
  job_id: str,
1157
  branch_name: str,
@@ -1170,7 +1205,9 @@ def handoff_to_blue_team(
1170
  "HAND"
1171
  )
1172
 
1173
- # First verify the synthetic test actually fails (confirms reproducibility)
 
 
1174
  env = os.environ.copy()
1175
  for secret_key in ["OPENROUTER_API_KEY", "GITHUB_TOKEN", "GITHUB_PERSONAL_ACCESS_TOKEN"]:
1176
  env.pop(secret_key, None)
@@ -1188,8 +1225,8 @@ def handoff_to_blue_team(
1188
 
1189
  if verify_proc.returncode == 0:
1190
  rte_log(
1191
- f"WARNING: Synthetic test PASSED on first run — crash may not be deterministic. "
1192
- f"Reporting anyway for human review.",
1193
  "WARN"
1194
  )
1195
  initial_failure_output = (
@@ -1198,17 +1235,14 @@ def handoff_to_blue_team(
1198
  f"Falsifying example: {crash.falsifying_example}"
1199
  )
1200
 
1201
- # Relative path of the synthetic test within repo context
1202
  rel_synthetic_test = os.path.relpath(crash.synthetic_test_path, repo_dir)
1203
-
1204
  rte_log(f"Dispatching Blue Team on: {rel_synthetic_test}", "HAND")
1205
 
1206
- # Call the Blue Team process_failing_test function
1207
  try:
1208
- blue_result = blue_team_fn(
1209
  test_path=rel_synthetic_test,
1210
  initial_failure=initial_failure_output,
1211
- pytest_bin=pytest_bin,
1212
  mcp_config_path=mcp_config_path,
1213
  job_id=job_id,
1214
  branch_name=branch_name,
@@ -1254,7 +1288,7 @@ def handoff_to_blue_team(
1254
 
1255
  def run_red_team_cegis(
1256
  repo_dir: str,
1257
- pytest_bin: str,
1258
  mcp_config_path: str,
1259
  blue_team_fn: Callable,
1260
  tenant_id: str = "default",
@@ -1269,7 +1303,7 @@ def run_red_team_cegis(
1269
 
1270
  Args:
1271
  repo_dir: Absolute path to the cloned target repository
1272
- pytest_bin: Path to pytest binary in the isolated venv
1273
  mcp_config_path: Path to MCP runtime config for Blue Team Aider
1274
  blue_team_fn: process_failing_test() from app.py (Blue Team entry point)
1275
  tenant_id: Namespace for job queue
@@ -1283,8 +1317,8 @@ def run_red_team_cegis(
1283
 
1284
  result = RedTeamResult(repo_dir=repo_dir, targets_analyzed=0)
1285
 
1286
- # Ensure hypothesis is available
1287
- if not _install_hypothesis_if_needed(pytest_bin, repo_dir):
1288
  rte_log("hypothesis not available — Red Team cannot run without it", "FAIL")
1289
  return result
1290
 
@@ -1331,7 +1365,6 @@ def run_red_team_cegis(
1331
  for cegis_round in range(1, MAX_CEGIS_ROUNDS + 1):
1332
  rte_log(f"CEGIS round {cegis_round}/{MAX_CEGIS_ROUNDS} for {target.profile.function_name}", "CEGIS")
1333
 
1334
- # Synthesize PBT — use strong model on final rounds
1335
  use_strong = (cegis_round >= MAX_CEGIS_ROUNDS - 1)
1336
  pbt = synthesize_pbt(target, cegis_round, survived_inputs, use_strong)
1337
 
@@ -1342,13 +1375,11 @@ def run_red_team_cegis(
1342
  result.total_fuzz_examples += FUZZ_MAX_EXAMPLES
1343
  result.cegis_rounds += 1
1344
 
1345
- # Run the fuzzer
1346
  crashed, crash_output, falsifying_example = run_fuzzing_loop(
1347
- pbt, target, repo_dir, pytest_bin
1348
  )
1349
 
1350
  if crashed and falsifying_example != "timeout":
1351
- # CRASH FOUND — package and hand off to Blue Team
1352
  crash_payload = package_crash_for_blue_team(
1353
  target=target,
1354
  pbt=pbt,
@@ -1379,7 +1410,6 @@ def run_red_team_cegis(
1379
  f"Handing to Blue Team for autonomous patching..."
1380
  )
1381
 
1382
- # CEGIS HANDOFF — dispatch Blue Team
1383
  job_id_rt = hashlib.sha256(
1384
  f"{repo_dir}:{target.profile.function_name}:{crash_payload.crash_hash}".encode()
1385
  ).hexdigest()[:16]
@@ -1391,7 +1421,7 @@ def run_red_team_cegis(
1391
  handoff_result = handoff_to_blue_team(
1392
  crash=crash_payload,
1393
  repo_dir=repo_dir,
1394
- pytest_bin=pytest_bin,
1395
  mcp_config_path=mcp_config_path,
1396
  job_id=job_id_rt,
1397
  branch_name=branch_name_rt,
@@ -1408,7 +1438,6 @@ def run_red_team_cegis(
1408
  break
1409
 
1410
  else:
1411
- # No crash — feed survived inputs back for next CEGIS round
1412
  new_survived = _extract_survived_inputs(crash_output)
1413
  survived_inputs.extend(new_survived)
1414
  survived_inputs = survived_inputs[:30]
 
48
  import json
49
  import os
50
  import re
51
+ import shutil
52
  import signal
53
  import subprocess
54
  import sys
 
58
  import time
59
  from dataclasses import dataclass, field
60
  from pathlib import Path
61
+ from typing import Callable, Optional, TYPE_CHECKING
62
 
63
  import requests
64
  from tenacity import retry, stop_after_attempt, wait_exponential
65
 
66
+ if TYPE_CHECKING:
67
+ from language_runtime import EnvConfig
68
+
69
  # ──────────────────────────────────────────────────────────────
70
  # CONFIGURATION & SECRETS
71
  # ──────────────────────────────────────────────────────────────
 
123
  return "\n".join(_rt_logs[-n:])
124
 
125
 
126
+ # ──────────────────────────────────────────────────────────────
127
+ # ENV CONFIG BINARY RESOLVERS
128
+ # Extracts concrete binary paths from an EnvConfig object so the
129
+ # fuzzing subprocess calls can work regardless of language runtime.
130
+ # ──────────────────────────────────────────────────────────────
131
+
132
+ def _get_runner_bin(env_config: "EnvConfig") -> str:
133
+ """
134
+ Resolve the pytest-compatible test runner binary from an EnvConfig.
135
+ Tries common attribute names, then venv_dir derivation, then system PATH.
136
+ """
137
+ for attr in ("runner_bin", "pytest_bin", "test_bin"):
138
+ val = getattr(env_config, attr, None)
139
+ if val and os.path.isfile(str(val)):
140
+ return str(val)
141
+ venv_dir = getattr(env_config, "venv_dir", None)
142
+ if venv_dir:
143
+ candidate = os.path.join(str(venv_dir), "bin", "pytest")
144
+ if os.path.isfile(candidate):
145
+ return candidate
146
+ return shutil.which("pytest") or "pytest"
147
+
148
+
149
+ def _get_python_bin(env_config: "EnvConfig") -> str:
150
+ """
151
+ Resolve the Python interpreter binary from an EnvConfig.
152
+ Falls back to the current process interpreter.
153
+ """
154
+ for attr in ("python_bin", "interpreter", "python"):
155
+ val = getattr(env_config, attr, None)
156
+ if val and os.path.isfile(str(val)):
157
+ return str(val)
158
+ venv_dir = getattr(env_config, "venv_dir", None)
159
+ if venv_dir:
160
+ candidate = os.path.join(str(venv_dir), "bin", "python")
161
+ if os.path.isfile(candidate):
162
+ return candidate
163
+ return sys.executable
164
+
165
+
166
+ def _get_pip_bin(env_config: "EnvConfig") -> str:
167
+ """
168
+ Resolve the pip binary from an EnvConfig.
169
+ Falls back to pip in the same venv, then system pip.
170
+ """
171
+ for attr in ("pip_bin", "pip"):
172
+ val = getattr(env_config, attr, None)
173
+ if val and os.path.isfile(str(val)):
174
+ return str(val)
175
+ venv_dir = getattr(env_config, "venv_dir", None)
176
+ if venv_dir:
177
+ candidate = os.path.join(str(venv_dir), "bin", "pip")
178
+ if os.path.isfile(candidate):
179
+ return candidate
180
+ return shutil.which("pip") or "pip"
181
+
182
+
183
  # ──────────────────────────────────────────────────────────────
184
  # DATA STRUCTURES
185
  # ──────────────────────────────────────────────────────────────
 
276
  pass
277
 
278
  # Fallback: count decision points
 
 
 
 
279
  score = 1.0
280
  try:
281
  tree = ast.parse(source)
 
466
  rte_log(f"Scanning AST of repository: {repo_dir}", "AST")
467
 
468
  targets: list[FuzzTarget] = []
 
 
469
  candidate_files: list[Path] = []
470
  repo_path = Path(repo_dir)
471
 
 
495
  rte_log(f"SyntaxError in {rel_path}: {e}", "WARN")
496
  continue
497
 
 
498
  source_lines = source.splitlines()
499
 
500
  for node in ast.walk(tree):
501
  if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
502
  continue
 
503
  name = node.name
504
  if name.startswith("__") and name.endswith("__"):
505
  continue
506
  if not node.args.args and not node.args.vararg:
507
  continue # No arguments = nothing to fuzz
508
 
 
509
  try:
510
  fn_start = node.lineno - 1
511
  fn_end = node.end_lineno if hasattr(node, "end_lineno") else fn_start + 30
 
516
  if len(fn_source.strip()) < 20:
517
  continue
518
 
 
519
  try:
520
  sig = ast.unparse(node) if hasattr(ast, "unparse") else name
521
  sig = sig.split("\n")[0].rstrip(":")
 
524
  except Exception:
525
  sig = f"def {name}(...)"
526
 
 
527
  docstring = ast.get_docstring(node) or ""
 
 
528
  arg_types = _extract_arg_types(node)
529
  return_type = _extract_return_type(node)
 
 
530
  complexity = _compute_cyclomatic_complexity(fn_source)
531
 
 
532
  has_loops = any(
533
  isinstance(n, (ast.For, ast.While, ast.AsyncFor))
534
  for n in ast.walk(node)
 
571
  attack_rationale=rationale,
572
  ))
573
 
 
574
  targets.sort(key=lambda t: t.attack_priority, reverse=True)
575
 
576
+ if targets:
577
+ rte_log(
578
+ f"AST analysis complete: {len(targets)} attack targets ranked. "
579
+ f"Top target: {targets[0].profile.function_name} "
580
+ f"(score={targets[0].attack_priority:.3f})",
581
+ "AST"
582
+ )
583
+ else:
584
+ rte_log("AST analysis complete: No targets found.", "AST")
585
 
586
  return targets[:MAX_TARGETS_PER_RUN]
587
 
 
713
  Strip markdown fences and extract raw Python from LLM response.
714
  The LLM is instructed not to use markdown, but be defensive.
715
  """
 
716
  raw = re.sub(r"```(?:python)?\s*\n?", "", raw)
717
  raw = re.sub(r"```\s*$", "", raw, flags=re.MULTILINE)
 
718
  lines = raw.splitlines()
719
  code_lines = []
720
  in_code = False
721
  for line in lines:
722
  stripped = line.strip()
723
+ if (stripped.startswith("#") or stripped.startswith("import") or
724
+ stripped.startswith("from") or stripped.startswith("def ") or
725
+ stripped.startswith("@") or stripped.startswith(" ") or
726
+ stripped == "" or in_code):
727
  in_code = True
728
  code_lines.append(line)
729
  elif in_code:
 
755
  raw_response = _call_red_team_llm(_RED_TEAM_SYSTEM_PROMPT, user_prompt, model)
756
  except Exception as e:
757
  rte_log(f"LLM call failed for {target.profile.function_name}: {e}", "WARN")
 
758
  if not use_strong_model:
759
  try:
760
  raw_response = _call_red_team_llm(
 
768
 
769
  test_code = _clean_llm_test_output(raw_response)
770
 
 
771
  if "from hypothesis" not in test_code and "import hypothesis" not in test_code:
772
  rte_log(
773
  f"LLM output for {target.profile.function_name} doesn't contain hypothesis imports — retrying",
 
776
  return None
777
 
778
  if "def test_" not in test_code:
779
+ rte_log("LLM output missing test function — retrying", "WARN")
780
  return None
781
 
 
782
  fn_match = re.search(r"def (test_\w+)\(", test_code)
783
  test_fn_name = fn_match.group(1) if fn_match else "test_invariant"
784
 
 
785
  inv_match = re.search(r'"""([^"]{10,200}?)"""', test_code)
786
  if not inv_match:
787
  inv_match = re.search(r"#\s*(.{10,120})", test_code)
788
  invariant_desc = inv_match.group(1).strip() if inv_match else "property invariant"
789
 
 
790
  strategy_match = re.search(r"@given\((.{5,200}?)\)", test_code)
791
  strategy = strategy_match.group(1) if strategy_match else "unknown"
792
 
 
810
  # SECTION 3: DETERMINISTIC FUZZING LOOP
811
  # ──────────────────────────────────────────────────────────────
812
 
813
+ def _install_hypothesis_if_needed(env_config: "EnvConfig", repo_dir: str) -> bool:
814
+ """
815
+ Ensure hypothesis is installed in the target environment.
816
+ Uses EnvConfig to resolve the correct python/pip binaries.
817
+ """
818
+ python_bin = _get_python_bin(env_config)
819
  try:
820
  check = subprocess.run(
821
+ [python_bin, "-c", "import hypothesis"],
822
  capture_output=True, timeout=15, cwd=repo_dir,
823
  )
824
  if check.returncode == 0:
 
826
  except Exception:
827
  pass
828
 
829
+ rte_log("Installing hypothesis into target env...", "FUZZ")
830
+ pip_bin = _get_pip_bin(env_config)
831
  try:
 
832
  result = subprocess.run(
833
  [pip_bin, "install", "hypothesis", "--quiet"],
834
  capture_output=True, timeout=120, cwd=repo_dir,
 
893
  if m:
894
  return m.group(1).strip()[:500]
895
 
 
896
  for line in output.splitlines():
897
  if "Error" in line or "assert" in line.lower():
898
  return line.strip()[:300]
 
916
  return "type_error"
917
  if "ValueError" in output:
918
  return "value_error"
919
+ if "MemoryError" in output: # FIX: was duplicated "MemoryError in output or MemoryError in output"
920
  return "memory_exhaustion"
921
  if "FAILED" in output:
922
  return "assertion"
 
931
  survived = []
932
  for m in re.finditer(r"Trying example.*?\((.+?)\)", output):
933
  survived.append(m.group(1)[:100])
 
934
  for m in re.finditer(r"explicit example.*?\((.+?)\)", output, re.IGNORECASE):
935
  survived.append(m.group(1)[:100])
936
  return survived[:20]
 
940
  pbt: GeneratedPBT,
941
  target: FuzzTarget,
942
  repo_dir: str,
943
+ env_config: "EnvConfig", # FIX: was pytest_bin: str
944
  ) -> tuple[bool, str, str]:
945
  """
946
  Execute the generated PBT via subprocess with hypothesis aggressive settings.
 
950
 
951
  Security: shell=False enforced, secrets stripped from env, SIGKILL on timeout.
952
  """
 
953
  test_file = _write_pbt_to_file(pbt, target, repo_dir)
954
+ pytest_bin = _get_runner_bin(env_config)
955
 
956
  rte_log(
957
  f"Fuzzing: {target.profile.function_name} | "
 
960
  "FUZZ"
961
  )
962
 
 
963
  env = os.environ.copy()
964
  for secret_key in [
965
  "OPENROUTER_API_KEY", "GITHUB_TOKEN", "GITHUB_PERSONAL_ACCESS_TOKEN",
 
967
  ]:
968
  env.pop(secret_key, None)
969
 
 
970
  env["HYPOTHESIS_MAX_EXAMPLES"] = str(FUZZ_MAX_EXAMPLES)
971
  env["HYPOTHESIS_VERBOSITY"] = "verbose"
 
972
  seed = (hash(target.profile.function_name) + pbt.cegis_round * 7919) % (2**31)
973
  env["HYPOTHESIS_SEED"] = str(abs(seed))
974
 
 
979
  "--tb=long",
980
  "--no-header",
981
  f"--hypothesis-seed={abs(seed)}",
982
+ "-x",
983
  ]
984
 
985
  proc = None
 
1032
  rte_log(f"Fuzzing subprocess error for {target.profile.function_name}: {e}", "FAIL")
1033
  return False, str(e), ""
1034
  finally:
 
1035
  if proc and proc.returncode == 0:
1036
  try:
1037
  os.unlink(test_file)
 
1054
  p = crash.target.profile
1055
  example = crash.falsifying_example
1056
 
 
 
1057
  arg_setup_lines = []
1058
  example_clean = example.strip().rstrip(")")
1059
  for part in re.split(r",\s*(?=[a-zA-Z_]\w*=)", example_clean):
 
1062
  arg_setup_lines.append(f" {part}")
1063
 
1064
  if not arg_setup_lines:
 
1065
  arg_setup_lines = [f" # Falsifying example: {example}"]
1066
+ arg_call = ", ".join("None" for _ in p.arg_types)
1067
  else:
1068
  arg_call = ", ".join(
1069
  part.strip().split("=")[0] for part in arg_setup_lines if "=" in part
 
1120
  # Blue Team: fix the implementation so this assertion holds for all inputs
1121
  try:
1122
  result = {p.function_name}({arg_call})
 
 
1123
  assert result is not None or result is None, (
1124
  f"Function returned unexpected result: {{result!r}}"
1125
  )
 
1151
  crash_hash = hashlib.sha256(crash_raw.encode()).hexdigest()[:16]
1152
  crash_type = _extract_crash_type(crash_output)
1153
 
 
1154
  fn_safe = target.profile.function_name.replace("-", "_")
1155
  synthetic_filename = f"test_rt_zero_day_{fn_safe}_{crash_hash}.py"
1156
  synthetic_path = os.path.join(RED_TEAM_DIR, synthetic_filename)
 
1186
  def handoff_to_blue_team(
1187
  crash: CrashPayload,
1188
  repo_dir: str,
1189
+ env_config: "EnvConfig", # FIX: was pytest_bin: str
1190
  mcp_config_path: str,
1191
  job_id: str,
1192
  branch_name: str,
 
1205
  "HAND"
1206
  )
1207
 
1208
+ pytest_bin = _get_runner_bin(env_config)
1209
+
1210
+ # Verify synthetic test actually fails (confirms reproducibility)
1211
  env = os.environ.copy()
1212
  for secret_key in ["OPENROUTER_API_KEY", "GITHUB_TOKEN", "GITHUB_PERSONAL_ACCESS_TOKEN"]:
1213
  env.pop(secret_key, None)
 
1225
 
1226
  if verify_proc.returncode == 0:
1227
  rte_log(
1228
+ "WARNING: Synthetic test PASSED on first run — crash may not be deterministic. "
1229
+ "Reporting anyway for human review.",
1230
  "WARN"
1231
  )
1232
  initial_failure_output = (
 
1235
  f"Falsifying example: {crash.falsifying_example}"
1236
  )
1237
 
 
1238
  rel_synthetic_test = os.path.relpath(crash.synthetic_test_path, repo_dir)
 
1239
  rte_log(f"Dispatching Blue Team on: {rel_synthetic_test}", "HAND")
1240
 
 
1241
  try:
1242
+ blue_result = blue_team_fn( # FIX: was pytest_bin=pytest_bin
1243
  test_path=rel_synthetic_test,
1244
  initial_failure=initial_failure_output,
1245
+ env_config=env_config,
1246
  mcp_config_path=mcp_config_path,
1247
  job_id=job_id,
1248
  branch_name=branch_name,
 
1288
 
1289
  def run_red_team_cegis(
1290
  repo_dir: str,
1291
+ env_config: "EnvConfig", # FIX: was pytest_bin: str
1292
  mcp_config_path: str,
1293
  blue_team_fn: Callable,
1294
  tenant_id: str = "default",
 
1303
 
1304
  Args:
1305
  repo_dir: Absolute path to the cloned target repository
1306
+ env_config: EnvConfig from language_runtime provides test runner, python, pip
1307
  mcp_config_path: Path to MCP runtime config for Blue Team Aider
1308
  blue_team_fn: process_failing_test() from app.py (Blue Team entry point)
1309
  tenant_id: Namespace for job queue
 
1317
 
1318
  result = RedTeamResult(repo_dir=repo_dir, targets_analyzed=0)
1319
 
1320
+ # Ensure hypothesis is available in the target environment
1321
+ if not _install_hypothesis_if_needed(env_config, repo_dir):
1322
  rte_log("hypothesis not available — Red Team cannot run without it", "FAIL")
1323
  return result
1324
 
 
1365
  for cegis_round in range(1, MAX_CEGIS_ROUNDS + 1):
1366
  rte_log(f"CEGIS round {cegis_round}/{MAX_CEGIS_ROUNDS} for {target.profile.function_name}", "CEGIS")
1367
 
 
1368
  use_strong = (cegis_round >= MAX_CEGIS_ROUNDS - 1)
1369
  pbt = synthesize_pbt(target, cegis_round, survived_inputs, use_strong)
1370
 
 
1375
  result.total_fuzz_examples += FUZZ_MAX_EXAMPLES
1376
  result.cegis_rounds += 1
1377
 
 
1378
  crashed, crash_output, falsifying_example = run_fuzzing_loop(
1379
+ pbt, target, repo_dir, env_config # FIX: was pytest_bin
1380
  )
1381
 
1382
  if crashed and falsifying_example != "timeout":
 
1383
  crash_payload = package_crash_for_blue_team(
1384
  target=target,
1385
  pbt=pbt,
 
1410
  f"Handing to Blue Team for autonomous patching..."
1411
  )
1412
 
 
1413
  job_id_rt = hashlib.sha256(
1414
  f"{repo_dir}:{target.profile.function_name}:{crash_payload.crash_hash}".encode()
1415
  ).hexdigest()[:16]
 
1421
  handoff_result = handoff_to_blue_team(
1422
  crash=crash_payload,
1423
  repo_dir=repo_dir,
1424
+ env_config=env_config, # FIX: was pytest_bin=pytest_bin
1425
  mcp_config_path=mcp_config_path,
1426
  job_id=job_id_rt,
1427
  branch_name=branch_name_rt,
 
1438
  break
1439
 
1440
  else:
 
1441
  new_survived = _extract_survived_inputs(crash_output)
1442
  survived_inputs.extend(new_survived)
1443
  survived_inputs = survived_inputs[:30]