feat: implement incremental pentest data persistence

This commit is contained in:
Ahmed Allam
2025-11-22 22:28:14 +04:00
committed by Ahmed Allam
parent 86e6ed49bb
commit 5befb32318
5 changed files with 48 additions and 33 deletions

1
.gitignore vendored
View File

@@ -79,6 +79,7 @@ logs/
tensorboard/ tensorboard/
# Agent execution traces # Agent execution traces
strix_runs/
agent_runs/ agent_runs/
# Misc # Misc

View File

@@ -80,7 +80,7 @@ strix --target ./app-directory
``` ```
> [!NOTE] > [!NOTE]
> First run automatically pulls the sandbox Docker image. Results are saved to `agent_runs/<run-name>` > First run automatically pulls the sandbox Docker image. Results are saved to `strix_runs/<run-name>`
## ☁️ Run Strix in Cloud ## ☁️ Run Strix in Cloud

View File

@@ -36,7 +36,7 @@ async def run_cli(args: Any) -> None: # noqa: PLR0915
results_text = Text() results_text = Text()
results_text.append("📊 Results will be saved to: ", style="bold cyan") results_text.append("📊 Results will be saved to: ", style="bold cyan")
results_text.append(f"agent_runs/{args.run_name}", style="bold white") results_text.append(f"strix_runs/{args.run_name}", style="bold white")
note_text = Text() note_text = Text()
note_text.append("\n\n", style="dim") note_text.append("\n\n", style="dim")

View File

@@ -475,7 +475,7 @@ def main() -> None:
else: else:
asyncio.run(run_tui(args)) asyncio.run(run_tui(args))
results_path = Path("agent_runs") / args.run_name results_path = Path("strix_runs") / args.run_name
display_completion_message(args, results_path) display_completion_message(args, results_path)
if args.non_interactive: if args.non_interactive:

View File

@@ -50,6 +50,7 @@ class Tracer:
self._run_dir: Path | None = None self._run_dir: Path | None = None
self._next_execution_id = 1 self._next_execution_id = 1
self._next_message_id = 1 self._next_message_id = 1
self._saved_vuln_ids: set[str] = set()
self.vulnerability_found_callback: Callable[[str, str, str, str], None] | None = None self.vulnerability_found_callback: Callable[[str, str, str, str], None] | None = None
@@ -59,7 +60,7 @@ class Tracer:
def get_run_dir(self) -> Path: def get_run_dir(self) -> Path:
if self._run_dir is None: if self._run_dir is None:
runs_dir = Path.cwd() / "agent_runs" runs_dir = Path.cwd() / "strix_runs"
runs_dir.mkdir(exist_ok=True) runs_dir.mkdir(exist_ok=True)
run_dir_name = self.run_name if self.run_name else self.run_id run_dir_name = self.run_name if self.run_name else self.run_id
@@ -92,6 +93,7 @@ class Tracer:
report_id, title.strip(), content.strip(), severity.lower().strip() report_id, title.strip(), content.strip(), severity.lower().strip()
) )
self.save_run_data()
return report_id return report_id
def set_final_scan_result( def set_final_scan_result(
@@ -108,6 +110,7 @@ class Tracer:
} }
logger.info(f"Set final scan result: success={success}") logger.info(f"Set final scan result: success={success}")
self.save_run_data(mark_complete=True)
def log_agent_creation( def log_agent_creation(
self, agent_id: str, name: str, task: str, parent_id: str | None = None self, agent_id: str, name: str, task: str, parent_id: str | None = None
@@ -197,11 +200,13 @@ class Tracer:
"max_iterations": config.get("max_iterations", 200), "max_iterations": config.get("max_iterations", 200),
} }
) )
self.get_run_dir()
def save_run_data(self) -> None: def save_run_data(self, mark_complete: bool = False) -> None:
try: try:
run_dir = self.get_run_dir() run_dir = self.get_run_dir()
self.end_time = datetime.now(UTC).isoformat() if mark_complete:
self.end_time = datetime.now(UTC).isoformat()
if self.final_scan_result: if self.final_scan_result:
penetration_test_report_file = run_dir / "penetration_test_report.md" penetration_test_report_file = run_dir / "penetration_test_report.md"
@@ -219,13 +224,13 @@ class Tracer:
vuln_dir = run_dir / "vulnerabilities" vuln_dir = run_dir / "vulnerabilities"
vuln_dir.mkdir(exist_ok=True) vuln_dir.mkdir(exist_ok=True)
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4} new_reports = [
sorted_reports = sorted( report
self.vulnerability_reports, for report in self.vulnerability_reports
key=lambda x: (severity_order.get(x["severity"], 5), x["timestamp"]), if report["id"] not in self._saved_vuln_ids
) ]
for report in sorted_reports: for report in new_reports:
vuln_file = vuln_dir / f"{report['id']}.md" vuln_file = vuln_dir / f"{report['id']}.md"
with vuln_file.open("w", encoding="utf-8") as f: with vuln_file.open("w", encoding="utf-8") as f:
f.write(f"# {report['title']}\n\n") f.write(f"# {report['title']}\n\n")
@@ -234,30 +239,39 @@ class Tracer:
f.write(f"**Found:** {report['timestamp']}\n\n") f.write(f"**Found:** {report['timestamp']}\n\n")
f.write("## Description\n\n") f.write("## Description\n\n")
f.write(f"{report['content']}\n") f.write(f"{report['content']}\n")
self._saved_vuln_ids.add(report["id"])
vuln_csv_file = run_dir / "vulnerabilities.csv" if self.vulnerability_reports:
with vuln_csv_file.open("w", encoding="utf-8", newline="") as f: severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}
import csv sorted_reports = sorted(
self.vulnerability_reports,
key=lambda x: (severity_order.get(x["severity"], 5), x["timestamp"]),
)
fieldnames = ["id", "title", "severity", "timestamp", "file"] vuln_csv_file = run_dir / "vulnerabilities.csv"
writer = csv.DictWriter(f, fieldnames=fieldnames) with vuln_csv_file.open("w", encoding="utf-8", newline="") as f:
writer.writeheader() import csv
for report in sorted_reports: fieldnames = ["id", "title", "severity", "timestamp", "file"]
writer.writerow( writer = csv.DictWriter(f, fieldnames=fieldnames)
{ writer.writeheader()
"id": report["id"],
"title": report["title"],
"severity": report["severity"].upper(),
"timestamp": report["timestamp"],
"file": f"vulnerabilities/{report['id']}.md",
}
)
logger.info( for report in sorted_reports:
f"Saved {len(self.vulnerability_reports)} vulnerability reports to: {vuln_dir}" writer.writerow(
) {
logger.info(f"Saved vulnerability index to: {vuln_csv_file}") "id": report["id"],
"title": report["title"],
"severity": report["severity"].upper(),
"timestamp": report["timestamp"],
"file": f"vulnerabilities/{report['id']}.md",
}
)
if new_reports:
logger.info(
f"Saved {len(new_reports)} new vulnerability report(s) to: {vuln_dir}"
)
logger.info(f"Updated vulnerability index: {vuln_csv_file}")
logger.info(f"📊 Essential scan data saved to: {run_dir}") logger.info(f"📊 Essential scan data saved to: {run_dir}")
@@ -320,4 +334,4 @@ class Tracer:
} }
def cleanup(self) -> None: def cleanup(self) -> None:
self.save_run_data() self.save_run_data(mark_complete=True)