diff --git a/poetry.lock b/poetry.lock
index c3753e2..c3b5fec 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1157,6 +1157,18 @@ ssh = ["bcrypt (>=3.1.5)"]
test = ["certifi (>=2024)", "cryptography-vectors (==44.0.1)", "pretend (>=0.7)", "pytest (>=7.4.0)", "pytest-benchmark (>=4.0)", "pytest-cov (>=2.10.1)", "pytest-xdist (>=3.5.0)"]
test-randomorder = ["pytest-randomly"]
+[[package]]
+name = "cvss"
+version = "3.6"
+description = "CVSS2/3/4 library with interactive calculator for Python 2 and Python 3"
+optional = false
+python-versions = "*"
+groups = ["main"]
+files = [
+ {file = "cvss-3.6-py2.py3-none-any.whl", hash = "sha256:e342c6ad9c7eb69d2aebbbc2768a03cabd57eb947c806e145de5b936219833ea"},
+ {file = "cvss-3.6.tar.gz", hash = "sha256:f21d18224efcd3c01b44ff1b37dec2e3208d29a6d0ce6c87a599c73c21ee1a99"},
+]
+
[[package]]
name = "cycler"
version = "0.12.1"
@@ -7412,4 +7424,4 @@ vertex = ["google-cloud-aiplatform"]
[metadata]
lock-version = "2.1"
python-versions = "^3.12"
-content-hash = "c33d9ef61601de836c80517ccff66cc57837baaebf22f929c766416c0b0fd818"
+content-hash = "91f49e313e5690bbef87e17730441f26d366daeccb16b5020e03e581fbb9d4d5"
diff --git a/pyproject.toml b/pyproject.toml
index e76c21f..e0cbe23 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -54,6 +54,7 @@ docker = "^7.1.0"
textual = "^4.0.0"
xmltodict = "^0.13.0"
requests = "^2.32.0"
+cvss = "^3.2"
# Optional LLM provider dependencies
google-cloud-aiplatform = { version = ">=1.38", optional = true }
@@ -144,6 +145,7 @@ module = [
"pyte.*",
"libtmux.*",
"pytest.*",
+ "cvss.*",
]
ignore_missing_imports = true
diff --git a/strix/interface/tool_components/finish_renderer.py b/strix/interface/tool_components/finish_renderer.py
index d1701e9..87cbc21 100644
--- a/strix/interface/tool_components/finish_renderer.py
+++ b/strix/interface/tool_components/finish_renderer.py
@@ -7,6 +7,9 @@ from .base_renderer import BaseToolRenderer
from .registry import register_tool_renderer
+FIELD_STYLE = "bold #4ade80"
+
+
@register_tool_renderer
class FinishScanRenderer(BaseToolRenderer):
tool_name: ClassVar[str] = "finish_scan"
@@ -16,22 +19,41 @@ class FinishScanRenderer(BaseToolRenderer):
def render(cls, tool_data: dict[str, Any]) -> Static:
args = tool_data.get("args", {})
- content = args.get("content", "")
- success = args.get("success", True)
+ executive_summary = args.get("executive_summary", "")
+ methodology = args.get("methodology", "")
+ technical_analysis = args.get("technical_analysis", "")
+ recommendations = args.get("recommendations", "")
text = Text()
text.append("🏁 ")
+ text.append("Finishing Scan", style="bold #dc2626")
- if success:
- text.append("Finishing Scan", style="bold #dc2626")
- else:
- text.append("Scan Failed", style="bold #dc2626")
+ if executive_summary:
+ text.append("\n\n")
+ text.append("Executive Summary", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(executive_summary)
- text.append("\n ")
+ if methodology:
+ text.append("\n\n")
+ text.append("Methodology", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(methodology)
- if content:
- text.append(content, style="bold")
- else:
+ if technical_analysis:
+ text.append("\n\n")
+ text.append("Technical Analysis", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(technical_analysis)
+
+ if recommendations:
+ text.append("\n\n")
+ text.append("Recommendations", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(recommendations)
+
+ if not (executive_summary or methodology or technical_analysis or recommendations):
+ text.append("\n ")
text.append("Generating final report...", style="dim")
css_classes = cls.get_css_classes("completed")
diff --git a/strix/interface/tool_components/reporting_renderer.py b/strix/interface/tool_components/reporting_renderer.py
index f978385..8293bc0 100644
--- a/strix/interface/tool_components/reporting_renderer.py
+++ b/strix/interface/tool_components/reporting_renderer.py
@@ -1,5 +1,8 @@
+from functools import cache
from typing import Any, ClassVar
+from pygments.lexers import PythonLexer
+from pygments.styles import get_style_by_name
from rich.text import Text
from textual.widgets import Static
@@ -7,6 +10,15 @@ from .base_renderer import BaseToolRenderer
from .registry import register_tool_renderer
+@cache
+def _get_style_colors() -> dict[Any, str]:
+ style = get_style_by_name("native")
+ return {token: f"#{style_def['color']}" for token, style_def in style if style_def["color"]}
+
+
+FIELD_STYLE = "bold #4ade80"
+
+
@register_tool_renderer
class CreateVulnerabilityReportRenderer(BaseToolRenderer):
tool_name: ClassVar[str] = "create_vulnerability_report"
@@ -21,30 +33,183 @@ class CreateVulnerabilityReportRenderer(BaseToolRenderer):
}
@classmethod
- def render(cls, tool_data: dict[str, Any]) -> Static:
+ def _get_token_color(cls, token_type: Any) -> str | None:
+ colors = _get_style_colors()
+ while token_type:
+ if token_type in colors:
+ return colors[token_type]
+ token_type = token_type.parent
+ return None
+
+ @classmethod
+ def _highlight_python(cls, code: str) -> Text:
+ lexer = PythonLexer()
+ text = Text()
+
+ for token_type, token_value in lexer.get_tokens(code):
+ if not token_value:
+ continue
+ color = cls._get_token_color(token_type)
+ text.append(token_value, style=color)
+
+ return text
+
+ @classmethod
+ def _get_cvss_color(cls, cvss_score: float) -> str:
+ if cvss_score >= 9.0:
+ return "#dc2626"
+ if cvss_score >= 7.0:
+ return "#ea580c"
+ if cvss_score >= 4.0:
+ return "#d97706"
+ if cvss_score >= 0.1:
+ return "#65a30d"
+ return "#6b7280"
+
+ @classmethod
+ def render(cls, tool_data: dict[str, Any]) -> Static: # noqa: PLR0912, PLR0915
args = tool_data.get("args", {})
+ result = tool_data.get("result", {})
title = args.get("title", "")
- severity = args.get("severity", "")
- content = args.get("content", "")
+ description = args.get("description", "")
+ impact = args.get("impact", "")
+ target = args.get("target", "")
+ technical_analysis = args.get("technical_analysis", "")
+ poc_description = args.get("poc_description", "")
+ poc_script_code = args.get("poc_script_code", "")
+ remediation_steps = args.get("remediation_steps", "")
+
+ attack_vector = args.get("attack_vector", "")
+ attack_complexity = args.get("attack_complexity", "")
+ privileges_required = args.get("privileges_required", "")
+ user_interaction = args.get("user_interaction", "")
+ scope = args.get("scope", "")
+ confidentiality = args.get("confidentiality", "")
+ integrity = args.get("integrity", "")
+ availability = args.get("availability", "")
+
+ endpoint = args.get("endpoint", "")
+ method = args.get("method", "")
+ cve = args.get("cve", "")
+
+ severity = ""
+ cvss_score = None
+ if isinstance(result, dict):
+ severity = result.get("severity", "")
+ cvss_score = result.get("cvss_score")
text = Text()
text.append("🐞 ")
text.append("Vulnerability Report", style="bold #ea580c")
if title:
- text.append("\n ")
- text.append(title, style="bold")
+ text.append("\n\n")
+ text.append("Title: ", style=FIELD_STYLE)
+ text.append(title)
- if severity:
- severity_color = cls.SEVERITY_COLORS.get(severity.lower(), "#6b7280")
- text.append("\n Severity: ")
- text.append(severity.upper(), style=severity_color)
+ if severity:
+ text.append("\n\n")
+ text.append("Severity: ", style=FIELD_STYLE)
+ severity_color = cls.SEVERITY_COLORS.get(severity.lower(), "#6b7280")
+ text.append(severity.upper(), style=f"bold {severity_color}")
- if content:
- text.append("\n ")
- text.append(content, style="dim")
- else:
+ if cvss_score is not None:
+ text.append("\n\n")
+ text.append("CVSS Score: ", style=FIELD_STYLE)
+ cvss_color = cls._get_cvss_color(cvss_score)
+ text.append(str(cvss_score), style=f"bold {cvss_color}")
+
+ if target:
+ text.append("\n\n")
+ text.append("Target: ", style=FIELD_STYLE)
+ text.append(target)
+
+ if endpoint:
+ text.append("\n\n")
+ text.append("Endpoint: ", style=FIELD_STYLE)
+ text.append(endpoint)
+
+ if method:
+ text.append("\n\n")
+ text.append("Method: ", style=FIELD_STYLE)
+ text.append(method)
+
+ if cve:
+ text.append("\n\n")
+ text.append("CVE: ", style=FIELD_STYLE)
+ text.append(cve)
+
+ if any(
+ [
+ attack_vector,
+ attack_complexity,
+ privileges_required,
+ user_interaction,
+ scope,
+ confidentiality,
+ integrity,
+ availability,
+ ]
+ ):
+ text.append("\n\n")
+ cvss_parts = []
+ if attack_vector:
+ cvss_parts.append(f"AV:{attack_vector}")
+ if attack_complexity:
+ cvss_parts.append(f"AC:{attack_complexity}")
+ if privileges_required:
+ cvss_parts.append(f"PR:{privileges_required}")
+ if user_interaction:
+ cvss_parts.append(f"UI:{user_interaction}")
+ if scope:
+ cvss_parts.append(f"S:{scope}")
+ if confidentiality:
+ cvss_parts.append(f"C:{confidentiality}")
+ if integrity:
+ cvss_parts.append(f"I:{integrity}")
+ if availability:
+ cvss_parts.append(f"A:{availability}")
+ text.append("CVSS Vector: ", style=FIELD_STYLE)
+ text.append("/".join(cvss_parts), style="dim")
+
+ if description:
+ text.append("\n\n")
+ text.append("Description", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(description)
+
+ if impact:
+ text.append("\n\n")
+ text.append("Impact", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(impact)
+
+ if technical_analysis:
+ text.append("\n\n")
+ text.append("Technical Analysis", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(technical_analysis)
+
+ if poc_description:
+ text.append("\n\n")
+ text.append("PoC Description", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(poc_description)
+
+ if poc_script_code:
+ text.append("\n\n")
+ text.append("PoC Code", style=FIELD_STYLE)
+ text.append("\n")
+ text.append_text(cls._highlight_python(poc_script_code))
+
+ if remediation_steps:
+ text.append("\n\n")
+ text.append("Remediation", style=FIELD_STYLE)
+ text.append("\n")
+ text.append(remediation_steps)
+
+ if not title:
text.append("\n ")
text.append("Creating report...", style="dim")
diff --git a/strix/telemetry/tracer.py b/strix/telemetry/tracer.py
index 8e2b491..8250bb3 100644
--- a/strix/telemetry/tracer.py
+++ b/strix/telemetry/tracer.py
@@ -71,47 +71,114 @@ class Tracer:
return self._run_dir
- def add_vulnerability_report(
+ def add_vulnerability_report( # noqa: PLR0912
self,
title: str,
- content: str,
severity: str,
+ description: str | None = None,
+ impact: str | None = None,
+ target: str | None = None,
+ technical_analysis: str | None = None,
+ poc_description: str | None = None,
+ poc_script_code: str | None = None,
+ remediation_steps: str | None = None,
+ cvss: float | None = None,
+ cvss_breakdown: dict[str, str] | None = None,
+ endpoint: str | None = None,
+ method: str | None = None,
+ cve: str | None = None,
+ code_file: str | None = None,
+ code_before: str | None = None,
+ code_after: str | None = None,
+ code_diff: str | None = None,
) -> str:
report_id = f"vuln-{len(self.vulnerability_reports) + 1:04d}"
- report = {
+ report: dict[str, Any] = {
"id": report_id,
"title": title.strip(),
- "content": content.strip(),
"severity": severity.lower().strip(),
"timestamp": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
}
+ if description:
+ report["description"] = description.strip()
+ if impact:
+ report["impact"] = impact.strip()
+ if target:
+ report["target"] = target.strip()
+ if technical_analysis:
+ report["technical_analysis"] = technical_analysis.strip()
+ if poc_description:
+ report["poc_description"] = poc_description.strip()
+ if poc_script_code:
+ report["poc_script_code"] = poc_script_code.strip()
+ if remediation_steps:
+ report["remediation_steps"] = remediation_steps.strip()
+ if cvss is not None:
+ report["cvss"] = cvss
+ if cvss_breakdown:
+ report["cvss_breakdown"] = cvss_breakdown
+ if endpoint:
+ report["endpoint"] = endpoint.strip()
+ if method:
+ report["method"] = method.strip()
+ if cve:
+ report["cve"] = cve.strip()
+ if code_file:
+ report["code_file"] = code_file.strip()
+ if code_before:
+ report["code_before"] = code_before.strip()
+ if code_after:
+ report["code_after"] = code_after.strip()
+ if code_diff:
+ report["code_diff"] = code_diff.strip()
+
self.vulnerability_reports.append(report)
logger.info(f"Added vulnerability report: {report_id} - {title}")
if self.vulnerability_found_callback:
self.vulnerability_found_callback(
- report_id, title.strip(), content.strip(), severity.lower().strip()
+ report_id, title.strip(), description or "", severity.lower().strip()
)
self.save_run_data()
return report_id
- def set_final_scan_result(
+ def update_scan_final_fields(
self,
- content: str,
- success: bool = True,
+ executive_summary: str,
+ methodology: str,
+ technical_analysis: str,
+ recommendations: str,
) -> None:
- self.final_scan_result = content.strip()
-
self.scan_results = {
"scan_completed": True,
- "content": content,
- "success": success,
+ "executive_summary": executive_summary.strip(),
+ "methodology": methodology.strip(),
+ "technical_analysis": technical_analysis.strip(),
+ "recommendations": recommendations.strip(),
+ "success": True,
}
- logger.info(f"Set final scan result: success={success}")
+ self.final_scan_result = f"""# Executive Summary
+
+{executive_summary.strip()}
+
+# Methodology
+
+{methodology.strip()}
+
+# Technical Analysis
+
+{technical_analysis.strip()}
+
+# Recommendations
+
+{recommendations.strip()}
+"""
+
+ logger.info("Updated scan final fields")
self.save_run_data(mark_complete=True)
def log_agent_creation(
@@ -204,7 +271,7 @@ class Tracer:
)
self.get_run_dir()
- def save_run_data(self, mark_complete: bool = False) -> None:
+ def save_run_data(self, mark_complete: bool = False) -> None: # noqa: PLR0912, PLR0915
try:
run_dir = self.get_run_dir()
if mark_complete:
@@ -232,42 +299,89 @@ class Tracer:
if report["id"] not in self._saved_vuln_ids
]
+ severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}
+ sorted_reports = sorted(
+ self.vulnerability_reports,
+ key=lambda x: (severity_order.get(x["severity"], 5), x["timestamp"]),
+ )
+
for report in new_reports:
vuln_file = vuln_dir / f"{report['id']}.md"
with vuln_file.open("w", encoding="utf-8") as f:
- f.write(f"# {report['title']}\n\n")
- f.write(f"**ID:** {report['id']}\n")
- f.write(f"**Severity:** {report['severity'].upper()}\n")
- f.write(f"**Found:** {report['timestamp']}\n\n")
- f.write("## Description\n\n")
- f.write(f"{report['content']}\n")
+ f.write(f"# {report.get('title', 'Untitled Vulnerability')}\n\n")
+ f.write(f"**ID:** {report.get('id', 'unknown')}\n")
+ f.write(f"**Severity:** {report.get('severity', 'unknown').upper()}\n")
+ f.write(f"**Found:** {report.get('timestamp', 'unknown')}\n")
+
+ metadata_fields: list[tuple[str, Any]] = [
+ ("Target", report.get("target")),
+ ("Endpoint", report.get("endpoint")),
+ ("Method", report.get("method")),
+ ("CVE", report.get("cve")),
+ ]
+ cvss_score = report.get("cvss")
+ if cvss_score is not None:
+ metadata_fields.append(("CVSS", cvss_score))
+
+ for label, value in metadata_fields:
+ if value:
+ f.write(f"**{label}:** {value}\n")
+
+ f.write("\n## Description\n\n")
+ desc = report.get("description") or "No description provided."
+ f.write(f"{desc}\n\n")
+
+ if report.get("impact"):
+ f.write("## Impact\n\n")
+ f.write(f"{report['impact']}\n\n")
+
+ if report.get("technical_analysis"):
+ f.write("## Technical Analysis\n\n")
+ f.write(f"{report['technical_analysis']}\n\n")
+
+ if report.get("poc_description") or report.get("poc_script_code"):
+ f.write("## Proof of Concept\n\n")
+ if report.get("poc_description"):
+ f.write(f"{report['poc_description']}\n\n")
+ if report.get("poc_script_code"):
+ f.write("```\n")
+ f.write(f"{report['poc_script_code']}\n")
+ f.write("```\n\n")
+
+ if report.get("code_file") or report.get("code_diff"):
+ f.write("## Code Analysis\n\n")
+ if report.get("code_file"):
+ f.write(f"**File:** {report['code_file']}\n\n")
+ if report.get("code_diff"):
+ f.write("**Changes:**\n")
+ f.write("```diff\n")
+ f.write(f"{report['code_diff']}\n")
+ f.write("```\n\n")
+
+ if report.get("remediation_steps"):
+ f.write("## Remediation\n\n")
+ f.write(f"{report['remediation_steps']}\n\n")
+
self._saved_vuln_ids.add(report["id"])
- if self.vulnerability_reports:
- severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}
- sorted_reports = sorted(
- self.vulnerability_reports,
- key=lambda x: (severity_order.get(x["severity"], 5), x["timestamp"]),
- )
+ vuln_csv_file = run_dir / "vulnerabilities.csv"
+ with vuln_csv_file.open("w", encoding="utf-8", newline="") as f:
+ import csv
- vuln_csv_file = run_dir / "vulnerabilities.csv"
- with vuln_csv_file.open("w", encoding="utf-8", newline="") as f:
- import csv
+ fieldnames = ["id", "title", "severity", "timestamp", "file"]
+ writer = csv.DictWriter(f, fieldnames=fieldnames)
+ writer.writeheader()
- fieldnames = ["id", "title", "severity", "timestamp", "file"]
- writer = csv.DictWriter(f, fieldnames=fieldnames)
- writer.writeheader()
-
- for report in sorted_reports:
- writer.writerow(
- {
- "id": report["id"],
- "title": report["title"],
- "severity": report["severity"].upper(),
- "timestamp": report["timestamp"],
- "file": f"vulnerabilities/{report['id']}.md",
- }
- )
+ for report in sorted_reports:
+ writer.writerow(
+ {
+ "id": report["id"],
+ "title": report["title"],
+ "severity": report["severity"].upper(),
+ "timestamp": report["timestamp"],
+ "file": f"vulnerabilities/{report['id']}.md",
+ }
+ )
if new_reports:
logger.info(
diff --git a/strix/tools/finish/finish_actions.py b/strix/tools/finish/finish_actions.py
index 42c3e67..79f48e7 100644
--- a/strix/tools/finish/finish_actions.py
+++ b/strix/tools/finish/finish_actions.py
@@ -4,49 +4,40 @@ from strix.tools.registry import register_tool
def _validate_root_agent(agent_state: Any) -> dict[str, Any] | None:
- if (
- agent_state is not None
- and hasattr(agent_state, "parent_id")
- and agent_state.parent_id is not None
- ):
+ if agent_state and hasattr(agent_state, "parent_id") and agent_state.parent_id is not None:
return {
"success": False,
- "message": (
- "This tool can only be used by the root/main agent. "
- "Subagents must use agent_finish instead."
- ),
+ "error": "finish_scan_wrong_agent",
+ "message": "This tool can only be used by the root/main agent",
+ "suggestion": "If you are a subagent, use agent_finish from agents_graph tool instead",
}
return None
-def _validate_content(content: str) -> dict[str, Any] | None:
- if not content or not content.strip():
- return {"success": False, "message": "Content cannot be empty"}
- return None
-
-
def _check_active_agents(agent_state: Any = None) -> dict[str, Any] | None:
try:
from strix.tools.agents_graph.agents_graph_actions import _agent_graph
- current_agent_id = None
- if agent_state and hasattr(agent_state, "agent_id"):
+ if agent_state and agent_state.agent_id:
current_agent_id = agent_state.agent_id
+ else:
+ return None
- running_agents = []
+ active_agents = []
stopping_agents = []
- for agent_id, node in _agent_graph.get("nodes", {}).items():
+ for agent_id, node in _agent_graph["nodes"].items():
if agent_id == current_agent_id:
continue
- status = node.get("status", "")
+ status = node.get("status", "unknown")
if status == "running":
- running_agents.append(
+ active_agents.append(
{
"id": agent_id,
"name": node.get("name", "Unknown"),
- "task": node.get("task", "No task description"),
+ "task": node.get("task", "Unknown task")[:300],
+ "status": status,
}
)
elif status == "stopping":
@@ -54,121 +45,105 @@ def _check_active_agents(agent_state: Any = None) -> dict[str, Any] | None:
{
"id": agent_id,
"name": node.get("name", "Unknown"),
+ "task": node.get("task", "Unknown task")[:300],
+ "status": status,
}
)
- if running_agents or stopping_agents:
- message_parts = ["Cannot finish scan while other agents are still active:"]
-
- if running_agents:
- message_parts.append("\n\nRunning agents:")
- message_parts.extend(
- [
- f" - {agent['name']} ({agent['id']}): {agent['task']}"
- for agent in running_agents
- ]
- )
-
- if stopping_agents:
- message_parts.append("\n\nStopping agents:")
- message_parts.extend(
- [f" - {agent['name']} ({agent['id']})" for agent in stopping_agents]
- )
-
- message_parts.extend(
- [
- "\n\nSuggested actions:",
- "1. Use wait_for_message to wait for all agents to complete",
- "2. Send messages to agents asking them to finish if urgent",
- "3. Use view_agent_graph to monitor agent status",
- ]
- )
-
- return {
+ if active_agents or stopping_agents:
+ response: dict[str, Any] = {
"success": False,
- "message": "\n".join(message_parts),
- "active_agents": {
- "running": len(running_agents),
- "stopping": len(stopping_agents),
- "details": {
- "running": running_agents,
- "stopping": stopping_agents,
- },
- },
+ "error": "agents_still_active",
+ "message": "Cannot finish scan: agents are still active",
}
+ if active_agents:
+ response["active_agents"] = active_agents
+
+ if stopping_agents:
+ response["stopping_agents"] = stopping_agents
+
+ response["suggestions"] = [
+ "Use wait_for_message to wait for all agents to complete",
+ "Use send_message_to_agent if you need agents to complete immediately",
+ "Check agent_status to see current agent states",
+ ]
+
+ response["total_active"] = len(active_agents) + len(stopping_agents)
+
+ return response
+
except ImportError:
+ pass
+ except Exception:
import logging
- logging.warning("Could not check agent graph status - agents_graph module unavailable")
+ logging.exception("Error checking active agents")
return None
-def _finalize_with_tracer(content: str, success: bool) -> dict[str, Any]:
+@register_tool(sandbox_execution=False)
+def finish_scan(
+ executive_summary: str,
+ methodology: str,
+ technical_analysis: str,
+ recommendations: str,
+ agent_state: Any = None,
+) -> dict[str, Any]:
+ validation_error = _validate_root_agent(agent_state)
+ if validation_error:
+ return validation_error
+
+ active_agents_error = _check_active_agents(agent_state)
+ if active_agents_error:
+ return active_agents_error
+
+ validation_errors = []
+
+ if not executive_summary or not executive_summary.strip():
+ validation_errors.append("Executive summary cannot be empty")
+ if not methodology or not methodology.strip():
+ validation_errors.append("Methodology cannot be empty")
+ if not technical_analysis or not technical_analysis.strip():
+ validation_errors.append("Technical analysis cannot be empty")
+ if not recommendations or not recommendations.strip():
+ validation_errors.append("Recommendations cannot be empty")
+
+ if validation_errors:
+ return {"success": False, "message": "Validation failed", "errors": validation_errors}
+
try:
from strix.telemetry.tracer import get_global_tracer
tracer = get_global_tracer()
if tracer:
- tracer.set_final_scan_result(
- content=content.strip(),
- success=success,
+ tracer.update_scan_final_fields(
+ executive_summary=executive_summary.strip(),
+ methodology=methodology.strip(),
+ technical_analysis=technical_analysis.strip(),
+ recommendations=recommendations.strip(),
)
+ vulnerability_count = len(tracer.vulnerability_reports)
+
return {
"success": True,
"scan_completed": True,
- "message": "Scan completed successfully"
- if success
- else "Scan completed with errors",
- "vulnerabilities_found": len(tracer.vulnerability_reports),
+ "message": "Scan completed successfully",
+ "vulnerabilities_found": vulnerability_count,
}
import logging
- logging.warning("Global tracer not available - final scan result not stored")
+ logging.warning("Current tracer not available - scan results not stored")
- return { # noqa: TRY300
- "success": True,
- "scan_completed": True,
- "message": "Scan completed successfully (not persisted)"
- if success
- else "Scan completed with errors (not persisted)",
- "warning": "Final result could not be persisted - tracer unavailable",
- }
-
- except ImportError:
+ except (ImportError, AttributeError) as e:
+ return {"success": False, "message": f"Failed to complete scan: {e!s}"}
+ else:
return {
"success": True,
"scan_completed": True,
- "message": "Scan completed successfully (not persisted)"
- if success
- else "Scan completed with errors (not persisted)",
- "warning": "Final result could not be persisted - tracer module unavailable",
+ "message": "Scan completed (not persisted)",
+ "warning": "Results could not be persisted - tracer unavailable",
}
-
-
-@register_tool(sandbox_execution=False)
-def finish_scan(
- content: str,
- success: bool = True,
- agent_state: Any = None,
-) -> dict[str, Any]:
- try:
- validation_error = _validate_root_agent(agent_state)
- if validation_error:
- return validation_error
-
- validation_error = _validate_content(content)
- if validation_error:
- return validation_error
-
- active_agents_error = _check_active_agents(agent_state)
- if active_agents_error:
- return active_agents_error
-
- return _finalize_with_tracer(content, success)
-
- except (ValueError, TypeError, KeyError) as e:
- return {"success": False, "message": f"Failed to complete scan: {e!s}"}
diff --git a/strix/tools/finish/finish_actions_schema.xml b/strix/tools/finish/finish_actions_schema.xml
index dc8ddde..d9c1bb7 100644
--- a/strix/tools/finish/finish_actions_schema.xml
+++ b/strix/tools/finish/finish_actions_schema.xml
@@ -1,6 +1,6 @@
- Complete the main security scan and generate final report.
+ Complete the security scan by providing the final assessment fields as full penetration test report.
IMPORTANT: This tool can ONLY be used by the root/main agent.
Subagents must use agent_finish from agents_graph tool instead.
@@ -8,11 +8,20 @@ Subagents must use agent_finish from agents_graph tool instead.
IMPORTANT: This tool will NOT allow finishing if any agents are still running or stopping.
You must wait for all agents to complete before using this tool.
-This tool MUST be called at the very end of the security assessment to:
-- Verify all agents have completed their tasks
-- Generate the final comprehensive scan report
-- Mark the entire scan as completed
-- Stop the agent from running
+This tool directly updates the scan report data:
+- executive_summary
+- methodology
+- technical_analysis
+- recommendations
+
+All fields are REQUIRED and map directly to the final report.
+
+This must be the last tool called in the scan. It will:
+1. Verify you are the root agent
+2. Check all subagents have completed
+3. Update the scan with your provided fields
+4. Mark the scan as completed
+5. Stop agent execution
Use this tool when:
- You are the main/root agent conducting the security assessment
@@ -23,23 +32,39 @@ Use this tool when:
IMPORTANT: Calling this tool multiple times will OVERWRITE any previous scan report.
Make sure you include ALL findings and details in a single comprehensive report.
-If agents are still running, this tool will:
+If agents are still running, the tool will:
- Show you which agents are still active
- Suggest using wait_for_message to wait for completion
- Suggest messaging agents if immediate completion is needed
-Put ALL details in the content - methodology, tools used, vulnerability counts, key findings, recommendations,
-compliance notes, risk assessments, next steps, etc. Be comprehensive and include everything relevant.
+NOTE: Make sure the vulnerabilities found were reported with create_vulnerability_report tool, otherwise they will not be tracked and you will not be rewarded.
+But make sure to not report the same vulnerability multiple times.
+
+Professional, customer-facing penetration test report rules (PDF-ready):
+- Do NOT include internal or system details: never mention local/absolute paths (e.g., "/workspace"), internal tools, agents, orchestrators, sandboxes, models, system prompts/instructions, connection/tooling issues, or tester environment details.
+- Tone and style: formal, objective, third-person, concise. No internal checklists or engineering runbooks. Content must read as a polished client deliverable.
+- Structure across fields should align to standard pentest reports:
+ - Executive summary: business impact, risk posture, notable criticals, remediation theme.
+ - Methodology: industry-standard methods (e.g., OWASP, OSSTMM, NIST), scope, constraints—no internal execution notes.
+ - Technical analysis: consolidated findings overview referencing created vulnerability reports; avoid raw logs.
+ - Recommendations: prioritized, actionable, aligned to risk and best practices.
+
-
- Complete scan report including executive summary, methodology, findings, vulnerability details, recommendations, compliance notes, risk assessment, and conclusions. Include everything relevant to the assessment.
+
+ High-level summary for executives: key findings, overall security posture, critical risks, business impact
-
- Whether the scan completed successfully without critical errors
+
+ Testing methodology: approach, tools used, scope, techniques employed
+
+
+ Detailed technical findings and security assessment results over the scan
+
+
+ Actionable security recommendations and remediation priorities
- Response containing success status and completion message. If agents are still running, returns details about active agents and suggested actions.
+ Response containing success status, vulnerability count, and completion message. If agents are still running, returns details about active agents and suggested actions.
diff --git a/strix/tools/reporting/reporting_actions.py b/strix/tools/reporting/reporting_actions.py
index dd98d6d..13fe55d 100644
--- a/strix/tools/reporting/reporting_actions.py
+++ b/strix/tools/reporting/reporting_actions.py
@@ -3,61 +3,209 @@ from typing import Any
from strix.tools.registry import register_tool
+def calculate_cvss_and_severity(
+ attack_vector: str,
+ attack_complexity: str,
+ privileges_required: str,
+ user_interaction: str,
+ scope: str,
+ confidentiality: str,
+ integrity: str,
+ availability: str,
+) -> tuple[float, str, str]:
+ try:
+ from cvss import CVSS3
+
+ vector = (
+ f"CVSS:3.1/AV:{attack_vector}/AC:{attack_complexity}/"
+ f"PR:{privileges_required}/UI:{user_interaction}/S:{scope}/"
+ f"C:{confidentiality}/I:{integrity}/A:{availability}"
+ )
+
+ c = CVSS3(vector)
+ scores = c.scores()
+ severities = c.severities()
+
+ base_score = scores[0]
+ base_severity = severities[0]
+
+ severity = base_severity.lower()
+
+ except Exception:
+ import logging
+
+ logging.exception("Failed to calculate CVSS")
+ return 7.5, "high", ""
+ else:
+ return base_score, severity, vector
+
+
+def _validate_required_fields(**kwargs: str | None) -> list[str]:
+ validation_errors: list[str] = []
+
+ required_fields = {
+ "title": "Title cannot be empty",
+ "description": "Description cannot be empty",
+ "impact": "Impact cannot be empty",
+ "target": "Target cannot be empty",
+ "technical_analysis": "Technical analysis cannot be empty",
+ "poc_description": "PoC description cannot be empty",
+ "poc_script_code": "PoC script/code is REQUIRED - provide the actual exploit/payload",
+ "remediation_steps": "Remediation steps cannot be empty",
+ }
+
+ for field_name, error_msg in required_fields.items():
+ value = kwargs.get(field_name)
+ if not value or not str(value).strip():
+ validation_errors.append(error_msg)
+
+ return validation_errors
+
+
+def _validate_cvss_parameters(**kwargs: str) -> list[str]:
+ validation_errors: list[str] = []
+
+ cvss_validations = {
+ "attack_vector": ["N", "A", "L", "P"],
+ "attack_complexity": ["L", "H"],
+ "privileges_required": ["N", "L", "H"],
+ "user_interaction": ["N", "R"],
+ "scope": ["U", "C"],
+ "confidentiality": ["N", "L", "H"],
+ "integrity": ["N", "L", "H"],
+ "availability": ["N", "L", "H"],
+ }
+
+ for param_name, valid_values in cvss_validations.items():
+ value = kwargs.get(param_name)
+ if value not in valid_values:
+ validation_errors.append(
+ f"Invalid {param_name}: {value}. Must be one of: {valid_values}"
+ )
+
+ return validation_errors
+
+
@register_tool(sandbox_execution=False)
def create_vulnerability_report(
title: str,
- content: str,
- severity: str,
+ description: str,
+ impact: str,
+ target: str,
+ technical_analysis: str,
+ poc_description: str,
+ poc_script_code: str,
+ remediation_steps: str,
+ # CVSS Breakdown Components
+ attack_vector: str,
+ attack_complexity: str,
+ privileges_required: str,
+ user_interaction: str,
+ scope: str,
+ confidentiality: str,
+ integrity: str,
+ availability: str,
+ # Optional fields
+ endpoint: str | None = None,
+ method: str | None = None,
+ cve: str | None = None,
+ code_file: str | None = None,
+ code_before: str | None = None,
+ code_after: str | None = None,
+ code_diff: str | None = None,
) -> dict[str, Any]:
- validation_error = None
- if not title or not title.strip():
- validation_error = "Title cannot be empty"
- elif not content or not content.strip():
- validation_error = "Content cannot be empty"
- elif not severity or not severity.strip():
- validation_error = "Severity cannot be empty"
- else:
- valid_severities = ["critical", "high", "medium", "low", "info"]
- if severity.lower() not in valid_severities:
- validation_error = (
- f"Invalid severity '{severity}'. Must be one of: {', '.join(valid_severities)}"
- )
+ validation_errors = _validate_required_fields(
+ title=title,
+ description=description,
+ impact=impact,
+ target=target,
+ technical_analysis=technical_analysis,
+ poc_description=poc_description,
+ poc_script_code=poc_script_code,
+ remediation_steps=remediation_steps,
+ )
- if validation_error:
- return {"success": False, "message": validation_error}
+ validation_errors.extend(
+ _validate_cvss_parameters(
+ attack_vector=attack_vector,
+ attack_complexity=attack_complexity,
+ privileges_required=privileges_required,
+ user_interaction=user_interaction,
+ scope=scope,
+ confidentiality=confidentiality,
+ integrity=integrity,
+ availability=availability,
+ )
+ )
+
+ if validation_errors:
+ return {"success": False, "message": "Validation failed", "errors": validation_errors}
+
+ cvss_score, severity, cvss_vector = calculate_cvss_and_severity(
+ attack_vector,
+ attack_complexity,
+ privileges_required,
+ user_interaction,
+ scope,
+ confidentiality,
+ integrity,
+ availability,
+ )
try:
from strix.telemetry.tracer import get_global_tracer
tracer = get_global_tracer()
if tracer:
+ cvss_breakdown = {
+ "attack_vector": attack_vector,
+ "attack_complexity": attack_complexity,
+ "privileges_required": privileges_required,
+ "user_interaction": user_interaction,
+ "scope": scope,
+ "confidentiality": confidentiality,
+ "integrity": integrity,
+ "availability": availability,
+ }
+
report_id = tracer.add_vulnerability_report(
title=title,
- content=content,
+ description=description,
severity=severity,
+ impact=impact,
+ target=target,
+ technical_analysis=technical_analysis,
+ poc_description=poc_description,
+ poc_script_code=poc_script_code,
+ remediation_steps=remediation_steps,
+ cvss=cvss_score,
+ cvss_breakdown=cvss_breakdown,
+ endpoint=endpoint,
+ method=method,
+ cve=cve,
+ code_file=code_file,
+ code_before=code_before,
+ code_after=code_after,
+ code_diff=code_diff,
)
return {
"success": True,
"message": f"Vulnerability report '{title}' created successfully",
"report_id": report_id,
- "severity": severity.lower(),
+ "severity": severity,
+ "cvss_score": cvss_score,
}
+
import logging
- logging.warning("Global tracer not available - vulnerability report not stored")
+ logging.warning("Current tracer not available - vulnerability report not stored")
- return { # noqa: TRY300
- "success": True,
- "message": f"Vulnerability report '{title}' created successfully (not persisted)",
- "warning": "Report could not be persisted - tracer unavailable",
- }
-
- except ImportError:
+ except (ImportError, AttributeError) as e:
+ return {"success": False, "message": f"Failed to create vulnerability report: {e!s}"}
+ else:
return {
"success": True,
- "message": f"Vulnerability report '{title}' created successfully (not persisted)",
- "warning": "Report could not be persisted - tracer module unavailable",
+ "message": f"Vulnerability report '{title}' created (not persisted)",
+ "warning": "Report could not be persisted - tracer unavailable",
}
- except (ValueError, TypeError) as e:
- return {"success": False, "message": f"Failed to create vulnerability report: {e!s}"}
diff --git a/strix/tools/reporting/reporting_actions_schema.xml b/strix/tools/reporting/reporting_actions_schema.xml
index 2e47d60..8b6b9ae 100644
--- a/strix/tools/reporting/reporting_actions_schema.xml
+++ b/strix/tools/reporting/reporting_actions_schema.xml
@@ -2,8 +2,7 @@
Create a vulnerability report for a discovered security issue.
-Use this tool to document a specific verified security vulnerability.
-Put ALL details in the content field - affected URLs, parameters, proof of concept, remediation steps, CVE references, CVSS scores, technical details, impact assessment, etc.
+Use this tool to document a specific fully verified security vulnerability.
DO NOT USE:
- For general security observations without specific vulnerabilities
@@ -11,20 +10,119 @@ DO NOT USE:
- When you don't have a proof of concept, or still not 100% sure if it's a vulnerability
- For tracking multiple vulnerabilities (create separate reports)
- For reporting multiple vulnerabilities at once. Use a separate create_vulnerability_report for each vulnerability.
+
+White-box requirement (when you have access to the code): You MUST include code_file, code_before, code_after, and code_diff. These must contain the actual code (before/after) and a complete, apply-able unified diff.
+
+Professional, customer-facing report rules (PDF-ready):
+- Do NOT include internal or system details: never mention local or absolute paths (e.g., "/workspace"), internal tools, agents, orchestrators, sandboxes, models, system prompts/instructions, connection issues, internal errors/logs/stack traces, or tester machine environment details.
+- Tone and style: formal, objective, third-person, vendor-neutral, concise. No runbooks, checklists, or engineering notes. Avoid headings like "QUICK", "Approach", or "Techniques" that read like internal guidance.
+- Use a standard penetration testing report structure per finding:
+ 1) Overview
+ 2) Severity and CVSS (vector only)
+ 3) Affected asset(s)
+ 4) Technical details
+ 5) Proof of concept (repro steps plus code)
+ 6) Impact
+ 7) Remediation
+ 8) Evidence (optional request/response excerpts, etc.) in the technical analysis field.
+- Numbered steps are allowed ONLY within the proof of concept. Elsewhere, use clear, concise paragraphs suitable for customer-facing reports.
+- Language must be precise and non-vague; avoid hedging.
- Clear, concise title of the vulnerability
+ Clear, specific title (e.g., "SQL Injection in /api/users Login Parameter"). But not too long. Don't mention CVE number in the title.
-
- Complete vulnerability details including affected URLs, technical details, impact, proof of concept, remediation steps, and any relevant references. Be comprehensive and include everything relevant.
+
+ Comprehensive description of the vulnerability and how it was discovered
-
- Severity level: critical, high, medium, low, or info
+
+ Impact assessment: what attacker can do, business risk, data at risk
+
+
+ Affected target: URL, domain, or Git repository
+
+
+ Technical explanation of the vulnerability mechanism and root cause
+
+
+ Step-by-step instructions to reproduce the vulnerability
+
+
+ Actual proof of concept code, exploit, payload, or script that demonstrates the vulnerability. Python code.
+
+
+ Specific, actionable steps to fix the vulnerability
+
+
+ CVSS Attack Vector - How the vulnerability is exploited:
+N = Network (remotely exploitable)
+A = Adjacent (same network segment)
+L = Local (local access required)
+P = Physical (physical access required)
+
+
+ CVSS Attack Complexity - Conditions beyond attacker's control:
+L = Low (no special conditions)
+H = High (special conditions must exist)
+
+
+ CVSS Privileges Required - Level of privileges needed:
+N = None (no privileges needed)
+L = Low (basic user privileges)
+H = High (admin privileges)
+
+
+ CVSS User Interaction - Does exploit require user action:
+N = None (no user interaction needed)
+R = Required (user must perform some action)
+
+
+ CVSS Scope - Can the vulnerability affect resources beyond its security scope:
+U = Unchanged (only affects the vulnerable component)
+C = Changed (affects resources beyond vulnerable component)
+
+
+ CVSS Confidentiality Impact - Impact to confidentiality:
+N = None (no impact)
+L = Low (some information disclosure)
+H = High (all information disclosed)
+
+
+ CVSS Integrity Impact - Impact to integrity:
+N = None (no impact)
+L = Low (data can be modified but scope is limited)
+H = High (total loss of integrity)
+
+
+ CVSS Availability Impact - Impact to availability:
+N = None (no impact)
+L = Low (reduced performance or interruptions)
+H = High (total loss of availability)
+
+
+ API endpoint(s) or URL path(s) (e.g., "/api/login") - for web vulnerabilities, or Git repository path(s) - for code vulnerabilities
+
+
+ HTTP method(s) (GET, POST, etc.) - for web vulnerabilities.
+
+
+ CVE identifier (e.g., "CVE-2024-1234"). Make sure it's a valid CVE. Use web search or vulnerability databases to make sure it's a valid CVE number.
+
+
+ MANDATORY for white-box testing: exact affected source file path(s).
+
+
+ MANDATORY for white-box testing: actual vulnerable code snippet(s) copied verbatim from the repository.
+
+
+ MANDATORY for white-box testing: corrected code snippet(s) exactly as they should appear after the fix.
+
+
+ MANDATORY for white-box testing: unified diff showing the code changes. Must be a complete, apply-able unified diff (git format) covering all affected files, with proper file headers, line numbers, and sufficient context.
- Response containing success status and message
+ Response containing success=true, message, report_id, severity, cvss_score