rhodawk-ai-devops-engine / lora_scheduler.py
Rhodawk Agent
fix(playbook §10): resolve all 12 diagnostic warnings W-001..W-012\n\n W-001 CRITICAL: Add scripts/generate_stubs.sh + Makefile to generate\n openclaude_grpc/openclaude_pb2*.py locally without a full Docker build.\n W-002 HIGH: Delete dead Rhodawk_AI_Pitch_Deck_2026.pptx (operator request).\n W-003 HIGH: Rename mcp_config.json -> mcp_config.ARCHIVE.json with archive notice.\n W-004 HIGH: formal_verifier.py — RHODAWK_Z3_ENABLED defaults to true; loud warning if skipped.\n W-005 MEDIUM: Surface RHODAWK_AUTO_MERGE state in System Status banner.\n W-006 MEDIUM: training_store.PgConn.executescript() splits multi-stmt SQL.\n W-007 MEDIUM: webhook_server returns 403 + JSON when secret unset.\n W-008 MEDIUM: hermes_orchestrator — new HERMES_PROVIDER routing flag.\n W-009 MEDIUM: New night_hunt_lock.py shared mutex across both night-hunt loops.\n W-010 MEDIUM: repo_harvester uses dynamic 30-day rolling pushed window.\n W-011 MEDIUM: lora_scheduler emits Telegram/Slack notification on export.\n W-012 MEDIUM: Live Operations tab shows System Status feature-gate banner.
6ed9a00
"""
Rhodawk AI — LoRA Fine-Tune Scheduler
======================================
Schedules periodic LoRA adapter fine-tuning runs using accumulated
(failure, fix) pairs from the training store.
This is NOT a training pipeline — it exports the training data in JSONL
format ready for consumption by:
- Hugging Face PEFT + TRL (local SFT)
- Hugging Face AutoTrain API
- OpenRouter/Together batch fine-tune API (when available)
The scheduler monitors fix_success_rate and triggers a training export
when enough new high-quality data has accumulated since the last run.
Trigger conditions (any one sufficient):
- NEW_GOOD_FIXES >= LORA_MIN_SAMPLES (default 50) since last run
- Time since last run >= LORA_MAX_AGE_HOURS (default 168 = 1 week)
Output artifact: /data/lora_training_data_{timestamp}.jsonl
Format: standard chat-format instruction tuning JSONL:
{"messages": [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]}
Enable: RHODAWK_LORA_ENABLED=true
"""
import json
import os
import sqlite3
import time
from training_store import DB_PATH
LORA_ENABLED = os.getenv("RHODAWK_LORA_ENABLED", "false").lower() == "true"
LORA_MIN_SAMPLES = int(os.getenv("RHODAWK_LORA_MIN_SAMPLES", "50"))
LORA_MAX_AGE_H = int(os.getenv("RHODAWK_LORA_MAX_AGE_HOURS", "168"))
LORA_OUTPUT_DIR = os.getenv("RHODAWK_LORA_OUTPUT_DIR", "/data/lora_exports")
LORA_STATE_PATH = "/data/lora_scheduler_state.json"
SYSTEM_PROMPT = (
"You are Rhodawk, an expert AI software engineer specializing in debugging "
"and fixing failing automated tests. You produce minimal, correct, secure "
"code fixes. You think step-by-step about the root cause before proposing a fix."
)
def _load_state() -> dict:
if not os.path.exists(LORA_STATE_PATH):
return {"last_run_ts": 0, "last_run_count": 0, "total_exports": 0}
try:
with open(LORA_STATE_PATH) as f:
return json.load(f)
except Exception:
return {"last_run_ts": 0, "last_run_count": 0, "total_exports": 0}
def _save_state(state: dict) -> None:
with open(LORA_STATE_PATH, "w") as f:
json.dump(state, f, indent=2)
def _count_good_fixes_since(since_count: int) -> int:
"""Count successful fix attempts added since the last export."""
try:
with sqlite3.connect(DB_PATH) as conn:
total = conn.execute(
"SELECT COUNT(*) FROM fix_attempts WHERE success_signal = 1"
).fetchone()[0]
return max(0, total - since_count)
except Exception:
return 0
def _export_training_data(min_success: int = 1, limit: int = 2000) -> list[dict]:
"""Export (failure, fix) pairs as chat-format messages."""
try:
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT fa.failure_output, fa.diff_produced, fa.test_path
FROM fix_attempts fa
WHERE fa.success_signal = 1
AND fa.diff_produced IS NOT NULL
AND fa.diff_produced != ''
ORDER BY fa.created_at DESC
LIMIT ?
""", (limit,)).fetchall()
except Exception:
return []
examples = []
for row in rows:
failure = (row["failure_output"] or "")[:3000]
diff = (row["diff_produced"] or "")[:4000]
test = row["test_path"] or "unknown"
user_content = (
f"The following test is failing:\n"
f"Test: `{test}`\n\n"
f"Failure output:\n```\n{failure}\n```\n\n"
f"Produce a minimal, secure diff to make this test pass. "
f"Do NOT modify the test file itself."
)
assistant_content = (
f"Here is the minimal fix:\n\n```diff\n{diff}\n```\n\n"
f"This fix addresses the root cause shown in the failure output."
)
examples.append({
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
{"role": "assistant", "content": assistant_content},
]
})
return examples
def should_trigger_training() -> tuple[bool, str]:
"""
Check whether training conditions are met.
Returns (should_run, reason).
"""
if not LORA_ENABLED:
return False, "LoRA scheduler disabled (RHODAWK_LORA_ENABLED != true)"
state = _load_state()
now = time.time()
age_hours = (now - state["last_run_ts"]) / 3600
new_fixes = _count_good_fixes_since(state["last_run_count"])
if new_fixes >= LORA_MIN_SAMPLES:
return True, f"{new_fixes} new good fixes (threshold: {LORA_MIN_SAMPLES})"
if state["last_run_ts"] > 0 and age_hours >= LORA_MAX_AGE_H:
return True, f"Max age {age_hours:.1f}h >= {LORA_MAX_AGE_H}h"
return False, (
f"Not triggered: {new_fixes}/{LORA_MIN_SAMPLES} new fixes, "
f"{age_hours:.1f}h/{LORA_MAX_AGE_H}h elapsed"
)
def run_training_export() -> dict:
"""
Export training data to a JSONL file.
Returns metadata about the export.
"""
os.makedirs(LORA_OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d_%H%M%S")
out_path = os.path.join(LORA_OUTPUT_DIR, f"lora_training_data_{timestamp}.jsonl")
examples = _export_training_data()
if not examples:
return {
"status": "skipped",
"reason": "no training examples available",
"path": None,
"count": 0,
}
with open(out_path, "w") as f:
for ex in examples:
f.write(json.dumps(ex) + "\n")
# W-011 FIX: notify operators (Telegram + Slack) on every successful
# LoRA JSONL export so they know fresh training data is ready, instead of
# silently accumulating files in /data/lora_exports/ with no consumer.
try:
import notifier as _notifier # type: ignore
msg = (
f"📦 *Rhodawk LoRA Export*\n"
f"• Samples: `{len(examples)}`\n"
f"• Path: `{out_path}`\n"
f"• Timestamp: `{timestamp}`\n"
f"Ready for HF PEFT/TRL/AutoTrain consumption."
)
if hasattr(_notifier, "notify"):
_notifier.notify(msg) # type: ignore[attr-defined]
elif hasattr(_notifier, "send_telegram"):
_notifier.send_telegram(msg) # type: ignore[attr-defined]
if hasattr(_notifier, "send_slack"):
_notifier.send_slack(msg) # type: ignore[attr-defined]
except Exception as _exc:
# Notifications are best-effort; never fail the export over a
# missing Telegram/Slack credential.
print(f"[lora_scheduler] notification dispatch skipped: {_exc}")
state = _load_state()
try:
with sqlite3.connect(DB_PATH) as conn:
total_good = conn.execute(
"SELECT COUNT(*) FROM fix_attempts WHERE success_signal = 1"
).fetchone()[0]
except Exception:
total_good = state["last_run_count"]
new_state = {
"last_run_ts": time.time(),
"last_run_count": total_good,
"total_exports": state["total_exports"] + 1,
"last_export_path": out_path,
"last_export_count": len(examples),
}
_save_state(new_state)
return {
"status": "ok",
"path": out_path,
"count": len(examples),
"timestamp": timestamp,
}
def maybe_trigger_training() -> str:
"""
Call this after each audit cycle. Triggers export if conditions are met.
Returns a human-readable status string for dashboard display.
"""
should, reason = should_trigger_training()
if not should:
return f"LoRA scheduler: waiting — {reason}"
result = run_training_export()
if result["status"] == "ok":
return (
f"LoRA export triggered: {result['count']} examples → {result['path']} "
f"({reason})"
)
return f"LoRA export skipped: {result.get('reason', 'unknown')}"
def get_scheduler_status() -> str:
"""Human-readable scheduler status for the dashboard."""
if not LORA_ENABLED:
return "LoRA scheduler disabled. Set RHODAWK_LORA_ENABLED=true to enable."
state = _load_state()
new_fixes = _count_good_fixes_since(state.get("last_run_count", 0))
last_run = state.get("last_run_ts", 0)
last_export = state.get("last_export_path", "none")
last_count = state.get("last_export_count", 0)
total = state.get("total_exports", 0)
last_run_str = (
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(last_run))
if last_run > 0 else "never"
)
return (
f"LoRA Scheduler Status:\n"
f" Last export: {last_run_str} ({last_count} examples)\n"
f" Total exports: {total}\n"
f" New good fixes since last export: {new_fixes}/{LORA_MIN_SAMPLES}\n"
f" Last file: {last_export}\n"
f" Min samples threshold: {LORA_MIN_SAMPLES}\n"
f" Max age: {LORA_MAX_AGE_H}h\n"
)