From 2ea5ff6695109be3dfd6cc6fd56850cd0defcd8d Mon Sep 17 00:00:00 2001 From: 0xallam Date: Wed, 7 Jan 2026 16:33:16 -0800 Subject: [PATCH] feat(reporting): enhance vulnerability reporting with detailed fields and CVSS calculation --- poetry.lock | 14 +- pyproject.toml | 2 + .../tool_components/finish_renderer.py | 42 +++- .../tool_components/reporting_renderer.py | 191 ++++++++++++++-- strix/telemetry/tracer.py | 200 +++++++++++++---- strix/tools/finish/finish_actions.py | 191 +++++++--------- strix/tools/finish/finish_actions_schema.xml | 53 +++-- strix/tools/reporting/reporting_actions.py | 210 +++++++++++++++--- .../reporting/reporting_actions_schema.xml | 114 +++++++++- 9 files changed, 789 insertions(+), 228 deletions(-) diff --git a/poetry.lock b/poetry.lock index c3753e2..c3b5fec 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1157,6 +1157,18 @@ ssh = ["bcrypt (>=3.1.5)"] test = ["certifi (>=2024)", "cryptography-vectors (==44.0.1)", "pretend (>=0.7)", "pytest (>=7.4.0)", "pytest-benchmark (>=4.0)", "pytest-cov (>=2.10.1)", "pytest-xdist (>=3.5.0)"] test-randomorder = ["pytest-randomly"] +[[package]] +name = "cvss" +version = "3.6" +description = "CVSS2/3/4 library with interactive calculator for Python 2 and Python 3" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "cvss-3.6-py2.py3-none-any.whl", hash = "sha256:e342c6ad9c7eb69d2aebbbc2768a03cabd57eb947c806e145de5b936219833ea"}, + {file = "cvss-3.6.tar.gz", hash = "sha256:f21d18224efcd3c01b44ff1b37dec2e3208d29a6d0ce6c87a599c73c21ee1a99"}, +] + [[package]] name = "cycler" version = "0.12.1" @@ -7412,4 +7424,4 @@ vertex = ["google-cloud-aiplatform"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "c33d9ef61601de836c80517ccff66cc57837baaebf22f929c766416c0b0fd818" +content-hash = "91f49e313e5690bbef87e17730441f26d366daeccb16b5020e03e581fbb9d4d5" diff --git a/pyproject.toml b/pyproject.toml index e76c21f..e0cbe23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ docker = "^7.1.0" textual = "^4.0.0" xmltodict = "^0.13.0" requests = "^2.32.0" +cvss = "^3.2" # Optional LLM provider dependencies google-cloud-aiplatform = { version = ">=1.38", optional = true } @@ -144,6 +145,7 @@ module = [ "pyte.*", "libtmux.*", "pytest.*", + "cvss.*", ] ignore_missing_imports = true diff --git a/strix/interface/tool_components/finish_renderer.py b/strix/interface/tool_components/finish_renderer.py index d1701e9..87cbc21 100644 --- a/strix/interface/tool_components/finish_renderer.py +++ b/strix/interface/tool_components/finish_renderer.py @@ -7,6 +7,9 @@ from .base_renderer import BaseToolRenderer from .registry import register_tool_renderer +FIELD_STYLE = "bold #4ade80" + + @register_tool_renderer class FinishScanRenderer(BaseToolRenderer): tool_name: ClassVar[str] = "finish_scan" @@ -16,22 +19,41 @@ class FinishScanRenderer(BaseToolRenderer): def render(cls, tool_data: dict[str, Any]) -> Static: args = tool_data.get("args", {}) - content = args.get("content", "") - success = args.get("success", True) + executive_summary = args.get("executive_summary", "") + methodology = args.get("methodology", "") + technical_analysis = args.get("technical_analysis", "") + recommendations = args.get("recommendations", "") text = Text() text.append("🏁 ") + text.append("Finishing Scan", style="bold #dc2626") - if success: - text.append("Finishing Scan", style="bold #dc2626") - else: - text.append("Scan Failed", style="bold #dc2626") + if executive_summary: + text.append("\n\n") + text.append("Executive Summary", style=FIELD_STYLE) + text.append("\n") + text.append(executive_summary) - text.append("\n ") + if methodology: + text.append("\n\n") + text.append("Methodology", style=FIELD_STYLE) + text.append("\n") + text.append(methodology) - if content: - text.append(content, style="bold") - else: + if technical_analysis: + text.append("\n\n") + text.append("Technical Analysis", style=FIELD_STYLE) + text.append("\n") + text.append(technical_analysis) + + if recommendations: + text.append("\n\n") + text.append("Recommendations", style=FIELD_STYLE) + text.append("\n") + text.append(recommendations) + + if not (executive_summary or methodology or technical_analysis or recommendations): + text.append("\n ") text.append("Generating final report...", style="dim") css_classes = cls.get_css_classes("completed") diff --git a/strix/interface/tool_components/reporting_renderer.py b/strix/interface/tool_components/reporting_renderer.py index f978385..8293bc0 100644 --- a/strix/interface/tool_components/reporting_renderer.py +++ b/strix/interface/tool_components/reporting_renderer.py @@ -1,5 +1,8 @@ +from functools import cache from typing import Any, ClassVar +from pygments.lexers import PythonLexer +from pygments.styles import get_style_by_name from rich.text import Text from textual.widgets import Static @@ -7,6 +10,15 @@ from .base_renderer import BaseToolRenderer from .registry import register_tool_renderer +@cache +def _get_style_colors() -> dict[Any, str]: + style = get_style_by_name("native") + return {token: f"#{style_def['color']}" for token, style_def in style if style_def["color"]} + + +FIELD_STYLE = "bold #4ade80" + + @register_tool_renderer class CreateVulnerabilityReportRenderer(BaseToolRenderer): tool_name: ClassVar[str] = "create_vulnerability_report" @@ -21,30 +33,183 @@ class CreateVulnerabilityReportRenderer(BaseToolRenderer): } @classmethod - def render(cls, tool_data: dict[str, Any]) -> Static: + def _get_token_color(cls, token_type: Any) -> str | None: + colors = _get_style_colors() + while token_type: + if token_type in colors: + return colors[token_type] + token_type = token_type.parent + return None + + @classmethod + def _highlight_python(cls, code: str) -> Text: + lexer = PythonLexer() + text = Text() + + for token_type, token_value in lexer.get_tokens(code): + if not token_value: + continue + color = cls._get_token_color(token_type) + text.append(token_value, style=color) + + return text + + @classmethod + def _get_cvss_color(cls, cvss_score: float) -> str: + if cvss_score >= 9.0: + return "#dc2626" + if cvss_score >= 7.0: + return "#ea580c" + if cvss_score >= 4.0: + return "#d97706" + if cvss_score >= 0.1: + return "#65a30d" + return "#6b7280" + + @classmethod + def render(cls, tool_data: dict[str, Any]) -> Static: # noqa: PLR0912, PLR0915 args = tool_data.get("args", {}) + result = tool_data.get("result", {}) title = args.get("title", "") - severity = args.get("severity", "") - content = args.get("content", "") + description = args.get("description", "") + impact = args.get("impact", "") + target = args.get("target", "") + technical_analysis = args.get("technical_analysis", "") + poc_description = args.get("poc_description", "") + poc_script_code = args.get("poc_script_code", "") + remediation_steps = args.get("remediation_steps", "") + + attack_vector = args.get("attack_vector", "") + attack_complexity = args.get("attack_complexity", "") + privileges_required = args.get("privileges_required", "") + user_interaction = args.get("user_interaction", "") + scope = args.get("scope", "") + confidentiality = args.get("confidentiality", "") + integrity = args.get("integrity", "") + availability = args.get("availability", "") + + endpoint = args.get("endpoint", "") + method = args.get("method", "") + cve = args.get("cve", "") + + severity = "" + cvss_score = None + if isinstance(result, dict): + severity = result.get("severity", "") + cvss_score = result.get("cvss_score") text = Text() text.append("🐞 ") text.append("Vulnerability Report", style="bold #ea580c") if title: - text.append("\n ") - text.append(title, style="bold") + text.append("\n\n") + text.append("Title: ", style=FIELD_STYLE) + text.append(title) - if severity: - severity_color = cls.SEVERITY_COLORS.get(severity.lower(), "#6b7280") - text.append("\n Severity: ") - text.append(severity.upper(), style=severity_color) + if severity: + text.append("\n\n") + text.append("Severity: ", style=FIELD_STYLE) + severity_color = cls.SEVERITY_COLORS.get(severity.lower(), "#6b7280") + text.append(severity.upper(), style=f"bold {severity_color}") - if content: - text.append("\n ") - text.append(content, style="dim") - else: + if cvss_score is not None: + text.append("\n\n") + text.append("CVSS Score: ", style=FIELD_STYLE) + cvss_color = cls._get_cvss_color(cvss_score) + text.append(str(cvss_score), style=f"bold {cvss_color}") + + if target: + text.append("\n\n") + text.append("Target: ", style=FIELD_STYLE) + text.append(target) + + if endpoint: + text.append("\n\n") + text.append("Endpoint: ", style=FIELD_STYLE) + text.append(endpoint) + + if method: + text.append("\n\n") + text.append("Method: ", style=FIELD_STYLE) + text.append(method) + + if cve: + text.append("\n\n") + text.append("CVE: ", style=FIELD_STYLE) + text.append(cve) + + if any( + [ + attack_vector, + attack_complexity, + privileges_required, + user_interaction, + scope, + confidentiality, + integrity, + availability, + ] + ): + text.append("\n\n") + cvss_parts = [] + if attack_vector: + cvss_parts.append(f"AV:{attack_vector}") + if attack_complexity: + cvss_parts.append(f"AC:{attack_complexity}") + if privileges_required: + cvss_parts.append(f"PR:{privileges_required}") + if user_interaction: + cvss_parts.append(f"UI:{user_interaction}") + if scope: + cvss_parts.append(f"S:{scope}") + if confidentiality: + cvss_parts.append(f"C:{confidentiality}") + if integrity: + cvss_parts.append(f"I:{integrity}") + if availability: + cvss_parts.append(f"A:{availability}") + text.append("CVSS Vector: ", style=FIELD_STYLE) + text.append("/".join(cvss_parts), style="dim") + + if description: + text.append("\n\n") + text.append("Description", style=FIELD_STYLE) + text.append("\n") + text.append(description) + + if impact: + text.append("\n\n") + text.append("Impact", style=FIELD_STYLE) + text.append("\n") + text.append(impact) + + if technical_analysis: + text.append("\n\n") + text.append("Technical Analysis", style=FIELD_STYLE) + text.append("\n") + text.append(technical_analysis) + + if poc_description: + text.append("\n\n") + text.append("PoC Description", style=FIELD_STYLE) + text.append("\n") + text.append(poc_description) + + if poc_script_code: + text.append("\n\n") + text.append("PoC Code", style=FIELD_STYLE) + text.append("\n") + text.append_text(cls._highlight_python(poc_script_code)) + + if remediation_steps: + text.append("\n\n") + text.append("Remediation", style=FIELD_STYLE) + text.append("\n") + text.append(remediation_steps) + + if not title: text.append("\n ") text.append("Creating report...", style="dim") diff --git a/strix/telemetry/tracer.py b/strix/telemetry/tracer.py index 8e2b491..8250bb3 100644 --- a/strix/telemetry/tracer.py +++ b/strix/telemetry/tracer.py @@ -71,47 +71,114 @@ class Tracer: return self._run_dir - def add_vulnerability_report( + def add_vulnerability_report( # noqa: PLR0912 self, title: str, - content: str, severity: str, + description: str | None = None, + impact: str | None = None, + target: str | None = None, + technical_analysis: str | None = None, + poc_description: str | None = None, + poc_script_code: str | None = None, + remediation_steps: str | None = None, + cvss: float | None = None, + cvss_breakdown: dict[str, str] | None = None, + endpoint: str | None = None, + method: str | None = None, + cve: str | None = None, + code_file: str | None = None, + code_before: str | None = None, + code_after: str | None = None, + code_diff: str | None = None, ) -> str: report_id = f"vuln-{len(self.vulnerability_reports) + 1:04d}" - report = { + report: dict[str, Any] = { "id": report_id, "title": title.strip(), - "content": content.strip(), "severity": severity.lower().strip(), "timestamp": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"), } + if description: + report["description"] = description.strip() + if impact: + report["impact"] = impact.strip() + if target: + report["target"] = target.strip() + if technical_analysis: + report["technical_analysis"] = technical_analysis.strip() + if poc_description: + report["poc_description"] = poc_description.strip() + if poc_script_code: + report["poc_script_code"] = poc_script_code.strip() + if remediation_steps: + report["remediation_steps"] = remediation_steps.strip() + if cvss is not None: + report["cvss"] = cvss + if cvss_breakdown: + report["cvss_breakdown"] = cvss_breakdown + if endpoint: + report["endpoint"] = endpoint.strip() + if method: + report["method"] = method.strip() + if cve: + report["cve"] = cve.strip() + if code_file: + report["code_file"] = code_file.strip() + if code_before: + report["code_before"] = code_before.strip() + if code_after: + report["code_after"] = code_after.strip() + if code_diff: + report["code_diff"] = code_diff.strip() + self.vulnerability_reports.append(report) logger.info(f"Added vulnerability report: {report_id} - {title}") if self.vulnerability_found_callback: self.vulnerability_found_callback( - report_id, title.strip(), content.strip(), severity.lower().strip() + report_id, title.strip(), description or "", severity.lower().strip() ) self.save_run_data() return report_id - def set_final_scan_result( + def update_scan_final_fields( self, - content: str, - success: bool = True, + executive_summary: str, + methodology: str, + technical_analysis: str, + recommendations: str, ) -> None: - self.final_scan_result = content.strip() - self.scan_results = { "scan_completed": True, - "content": content, - "success": success, + "executive_summary": executive_summary.strip(), + "methodology": methodology.strip(), + "technical_analysis": technical_analysis.strip(), + "recommendations": recommendations.strip(), + "success": True, } - logger.info(f"Set final scan result: success={success}") + self.final_scan_result = f"""# Executive Summary + +{executive_summary.strip()} + +# Methodology + +{methodology.strip()} + +# Technical Analysis + +{technical_analysis.strip()} + +# Recommendations + +{recommendations.strip()} +""" + + logger.info("Updated scan final fields") self.save_run_data(mark_complete=True) def log_agent_creation( @@ -204,7 +271,7 @@ class Tracer: ) self.get_run_dir() - def save_run_data(self, mark_complete: bool = False) -> None: + def save_run_data(self, mark_complete: bool = False) -> None: # noqa: PLR0912, PLR0915 try: run_dir = self.get_run_dir() if mark_complete: @@ -232,42 +299,89 @@ class Tracer: if report["id"] not in self._saved_vuln_ids ] + severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4} + sorted_reports = sorted( + self.vulnerability_reports, + key=lambda x: (severity_order.get(x["severity"], 5), x["timestamp"]), + ) + for report in new_reports: vuln_file = vuln_dir / f"{report['id']}.md" with vuln_file.open("w", encoding="utf-8") as f: - f.write(f"# {report['title']}\n\n") - f.write(f"**ID:** {report['id']}\n") - f.write(f"**Severity:** {report['severity'].upper()}\n") - f.write(f"**Found:** {report['timestamp']}\n\n") - f.write("## Description\n\n") - f.write(f"{report['content']}\n") + f.write(f"# {report.get('title', 'Untitled Vulnerability')}\n\n") + f.write(f"**ID:** {report.get('id', 'unknown')}\n") + f.write(f"**Severity:** {report.get('severity', 'unknown').upper()}\n") + f.write(f"**Found:** {report.get('timestamp', 'unknown')}\n") + + metadata_fields: list[tuple[str, Any]] = [ + ("Target", report.get("target")), + ("Endpoint", report.get("endpoint")), + ("Method", report.get("method")), + ("CVE", report.get("cve")), + ] + cvss_score = report.get("cvss") + if cvss_score is not None: + metadata_fields.append(("CVSS", cvss_score)) + + for label, value in metadata_fields: + if value: + f.write(f"**{label}:** {value}\n") + + f.write("\n## Description\n\n") + desc = report.get("description") or "No description provided." + f.write(f"{desc}\n\n") + + if report.get("impact"): + f.write("## Impact\n\n") + f.write(f"{report['impact']}\n\n") + + if report.get("technical_analysis"): + f.write("## Technical Analysis\n\n") + f.write(f"{report['technical_analysis']}\n\n") + + if report.get("poc_description") or report.get("poc_script_code"): + f.write("## Proof of Concept\n\n") + if report.get("poc_description"): + f.write(f"{report['poc_description']}\n\n") + if report.get("poc_script_code"): + f.write("```\n") + f.write(f"{report['poc_script_code']}\n") + f.write("```\n\n") + + if report.get("code_file") or report.get("code_diff"): + f.write("## Code Analysis\n\n") + if report.get("code_file"): + f.write(f"**File:** {report['code_file']}\n\n") + if report.get("code_diff"): + f.write("**Changes:**\n") + f.write("```diff\n") + f.write(f"{report['code_diff']}\n") + f.write("```\n\n") + + if report.get("remediation_steps"): + f.write("## Remediation\n\n") + f.write(f"{report['remediation_steps']}\n\n") + self._saved_vuln_ids.add(report["id"]) - if self.vulnerability_reports: - severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4} - sorted_reports = sorted( - self.vulnerability_reports, - key=lambda x: (severity_order.get(x["severity"], 5), x["timestamp"]), - ) + vuln_csv_file = run_dir / "vulnerabilities.csv" + with vuln_csv_file.open("w", encoding="utf-8", newline="") as f: + import csv - vuln_csv_file = run_dir / "vulnerabilities.csv" - with vuln_csv_file.open("w", encoding="utf-8", newline="") as f: - import csv + fieldnames = ["id", "title", "severity", "timestamp", "file"] + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() - fieldnames = ["id", "title", "severity", "timestamp", "file"] - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - - for report in sorted_reports: - writer.writerow( - { - "id": report["id"], - "title": report["title"], - "severity": report["severity"].upper(), - "timestamp": report["timestamp"], - "file": f"vulnerabilities/{report['id']}.md", - } - ) + for report in sorted_reports: + writer.writerow( + { + "id": report["id"], + "title": report["title"], + "severity": report["severity"].upper(), + "timestamp": report["timestamp"], + "file": f"vulnerabilities/{report['id']}.md", + } + ) if new_reports: logger.info( diff --git a/strix/tools/finish/finish_actions.py b/strix/tools/finish/finish_actions.py index 42c3e67..79f48e7 100644 --- a/strix/tools/finish/finish_actions.py +++ b/strix/tools/finish/finish_actions.py @@ -4,49 +4,40 @@ from strix.tools.registry import register_tool def _validate_root_agent(agent_state: Any) -> dict[str, Any] | None: - if ( - agent_state is not None - and hasattr(agent_state, "parent_id") - and agent_state.parent_id is not None - ): + if agent_state and hasattr(agent_state, "parent_id") and agent_state.parent_id is not None: return { "success": False, - "message": ( - "This tool can only be used by the root/main agent. " - "Subagents must use agent_finish instead." - ), + "error": "finish_scan_wrong_agent", + "message": "This tool can only be used by the root/main agent", + "suggestion": "If you are a subagent, use agent_finish from agents_graph tool instead", } return None -def _validate_content(content: str) -> dict[str, Any] | None: - if not content or not content.strip(): - return {"success": False, "message": "Content cannot be empty"} - return None - - def _check_active_agents(agent_state: Any = None) -> dict[str, Any] | None: try: from strix.tools.agents_graph.agents_graph_actions import _agent_graph - current_agent_id = None - if agent_state and hasattr(agent_state, "agent_id"): + if agent_state and agent_state.agent_id: current_agent_id = agent_state.agent_id + else: + return None - running_agents = [] + active_agents = [] stopping_agents = [] - for agent_id, node in _agent_graph.get("nodes", {}).items(): + for agent_id, node in _agent_graph["nodes"].items(): if agent_id == current_agent_id: continue - status = node.get("status", "") + status = node.get("status", "unknown") if status == "running": - running_agents.append( + active_agents.append( { "id": agent_id, "name": node.get("name", "Unknown"), - "task": node.get("task", "No task description"), + "task": node.get("task", "Unknown task")[:300], + "status": status, } ) elif status == "stopping": @@ -54,121 +45,105 @@ def _check_active_agents(agent_state: Any = None) -> dict[str, Any] | None: { "id": agent_id, "name": node.get("name", "Unknown"), + "task": node.get("task", "Unknown task")[:300], + "status": status, } ) - if running_agents or stopping_agents: - message_parts = ["Cannot finish scan while other agents are still active:"] - - if running_agents: - message_parts.append("\n\nRunning agents:") - message_parts.extend( - [ - f" - {agent['name']} ({agent['id']}): {agent['task']}" - for agent in running_agents - ] - ) - - if stopping_agents: - message_parts.append("\n\nStopping agents:") - message_parts.extend( - [f" - {agent['name']} ({agent['id']})" for agent in stopping_agents] - ) - - message_parts.extend( - [ - "\n\nSuggested actions:", - "1. Use wait_for_message to wait for all agents to complete", - "2. Send messages to agents asking them to finish if urgent", - "3. Use view_agent_graph to monitor agent status", - ] - ) - - return { + if active_agents or stopping_agents: + response: dict[str, Any] = { "success": False, - "message": "\n".join(message_parts), - "active_agents": { - "running": len(running_agents), - "stopping": len(stopping_agents), - "details": { - "running": running_agents, - "stopping": stopping_agents, - }, - }, + "error": "agents_still_active", + "message": "Cannot finish scan: agents are still active", } + if active_agents: + response["active_agents"] = active_agents + + if stopping_agents: + response["stopping_agents"] = stopping_agents + + response["suggestions"] = [ + "Use wait_for_message to wait for all agents to complete", + "Use send_message_to_agent if you need agents to complete immediately", + "Check agent_status to see current agent states", + ] + + response["total_active"] = len(active_agents) + len(stopping_agents) + + return response + except ImportError: + pass + except Exception: import logging - logging.warning("Could not check agent graph status - agents_graph module unavailable") + logging.exception("Error checking active agents") return None -def _finalize_with_tracer(content: str, success: bool) -> dict[str, Any]: +@register_tool(sandbox_execution=False) +def finish_scan( + executive_summary: str, + methodology: str, + technical_analysis: str, + recommendations: str, + agent_state: Any = None, +) -> dict[str, Any]: + validation_error = _validate_root_agent(agent_state) + if validation_error: + return validation_error + + active_agents_error = _check_active_agents(agent_state) + if active_agents_error: + return active_agents_error + + validation_errors = [] + + if not executive_summary or not executive_summary.strip(): + validation_errors.append("Executive summary cannot be empty") + if not methodology or not methodology.strip(): + validation_errors.append("Methodology cannot be empty") + if not technical_analysis or not technical_analysis.strip(): + validation_errors.append("Technical analysis cannot be empty") + if not recommendations or not recommendations.strip(): + validation_errors.append("Recommendations cannot be empty") + + if validation_errors: + return {"success": False, "message": "Validation failed", "errors": validation_errors} + try: from strix.telemetry.tracer import get_global_tracer tracer = get_global_tracer() if tracer: - tracer.set_final_scan_result( - content=content.strip(), - success=success, + tracer.update_scan_final_fields( + executive_summary=executive_summary.strip(), + methodology=methodology.strip(), + technical_analysis=technical_analysis.strip(), + recommendations=recommendations.strip(), ) + vulnerability_count = len(tracer.vulnerability_reports) + return { "success": True, "scan_completed": True, - "message": "Scan completed successfully" - if success - else "Scan completed with errors", - "vulnerabilities_found": len(tracer.vulnerability_reports), + "message": "Scan completed successfully", + "vulnerabilities_found": vulnerability_count, } import logging - logging.warning("Global tracer not available - final scan result not stored") + logging.warning("Current tracer not available - scan results not stored") - return { # noqa: TRY300 - "success": True, - "scan_completed": True, - "message": "Scan completed successfully (not persisted)" - if success - else "Scan completed with errors (not persisted)", - "warning": "Final result could not be persisted - tracer unavailable", - } - - except ImportError: + except (ImportError, AttributeError) as e: + return {"success": False, "message": f"Failed to complete scan: {e!s}"} + else: return { "success": True, "scan_completed": True, - "message": "Scan completed successfully (not persisted)" - if success - else "Scan completed with errors (not persisted)", - "warning": "Final result could not be persisted - tracer module unavailable", + "message": "Scan completed (not persisted)", + "warning": "Results could not be persisted - tracer unavailable", } - - -@register_tool(sandbox_execution=False) -def finish_scan( - content: str, - success: bool = True, - agent_state: Any = None, -) -> dict[str, Any]: - try: - validation_error = _validate_root_agent(agent_state) - if validation_error: - return validation_error - - validation_error = _validate_content(content) - if validation_error: - return validation_error - - active_agents_error = _check_active_agents(agent_state) - if active_agents_error: - return active_agents_error - - return _finalize_with_tracer(content, success) - - except (ValueError, TypeError, KeyError) as e: - return {"success": False, "message": f"Failed to complete scan: {e!s}"} diff --git a/strix/tools/finish/finish_actions_schema.xml b/strix/tools/finish/finish_actions_schema.xml index dc8ddde..d9c1bb7 100644 --- a/strix/tools/finish/finish_actions_schema.xml +++ b/strix/tools/finish/finish_actions_schema.xml @@ -1,6 +1,6 @@ - Complete the main security scan and generate final report. + Complete the security scan by providing the final assessment fields as full penetration test report. IMPORTANT: This tool can ONLY be used by the root/main agent. Subagents must use agent_finish from agents_graph tool instead. @@ -8,11 +8,20 @@ Subagents must use agent_finish from agents_graph tool instead. IMPORTANT: This tool will NOT allow finishing if any agents are still running or stopping. You must wait for all agents to complete before using this tool. -This tool MUST be called at the very end of the security assessment to: -- Verify all agents have completed their tasks -- Generate the final comprehensive scan report -- Mark the entire scan as completed -- Stop the agent from running +This tool directly updates the scan report data: +- executive_summary +- methodology +- technical_analysis +- recommendations + +All fields are REQUIRED and map directly to the final report. + +This must be the last tool called in the scan. It will: +1. Verify you are the root agent +2. Check all subagents have completed +3. Update the scan with your provided fields +4. Mark the scan as completed +5. Stop agent execution Use this tool when: - You are the main/root agent conducting the security assessment @@ -23,23 +32,39 @@ Use this tool when: IMPORTANT: Calling this tool multiple times will OVERWRITE any previous scan report. Make sure you include ALL findings and details in a single comprehensive report. -If agents are still running, this tool will: +If agents are still running, the tool will: - Show you which agents are still active - Suggest using wait_for_message to wait for completion - Suggest messaging agents if immediate completion is needed -Put ALL details in the content - methodology, tools used, vulnerability counts, key findings, recommendations, -compliance notes, risk assessments, next steps, etc. Be comprehensive and include everything relevant. +NOTE: Make sure the vulnerabilities found were reported with create_vulnerability_report tool, otherwise they will not be tracked and you will not be rewarded. +But make sure to not report the same vulnerability multiple times. + +Professional, customer-facing penetration test report rules (PDF-ready): +- Do NOT include internal or system details: never mention local/absolute paths (e.g., "/workspace"), internal tools, agents, orchestrators, sandboxes, models, system prompts/instructions, connection/tooling issues, or tester environment details. +- Tone and style: formal, objective, third-person, concise. No internal checklists or engineering runbooks. Content must read as a polished client deliverable. +- Structure across fields should align to standard pentest reports: + - Executive summary: business impact, risk posture, notable criticals, remediation theme. + - Methodology: industry-standard methods (e.g., OWASP, OSSTMM, NIST), scope, constraints—no internal execution notes. + - Technical analysis: consolidated findings overview referencing created vulnerability reports; avoid raw logs. + - Recommendations: prioritized, actionable, aligned to risk and best practices. + - - Complete scan report including executive summary, methodology, findings, vulnerability details, recommendations, compliance notes, risk assessment, and conclusions. Include everything relevant to the assessment. + + High-level summary for executives: key findings, overall security posture, critical risks, business impact - - Whether the scan completed successfully without critical errors + + Testing methodology: approach, tools used, scope, techniques employed + + + Detailed technical findings and security assessment results over the scan + + + Actionable security recommendations and remediation priorities - Response containing success status and completion message. If agents are still running, returns details about active agents and suggested actions. + Response containing success status, vulnerability count, and completion message. If agents are still running, returns details about active agents and suggested actions. diff --git a/strix/tools/reporting/reporting_actions.py b/strix/tools/reporting/reporting_actions.py index dd98d6d..13fe55d 100644 --- a/strix/tools/reporting/reporting_actions.py +++ b/strix/tools/reporting/reporting_actions.py @@ -3,61 +3,209 @@ from typing import Any from strix.tools.registry import register_tool +def calculate_cvss_and_severity( + attack_vector: str, + attack_complexity: str, + privileges_required: str, + user_interaction: str, + scope: str, + confidentiality: str, + integrity: str, + availability: str, +) -> tuple[float, str, str]: + try: + from cvss import CVSS3 + + vector = ( + f"CVSS:3.1/AV:{attack_vector}/AC:{attack_complexity}/" + f"PR:{privileges_required}/UI:{user_interaction}/S:{scope}/" + f"C:{confidentiality}/I:{integrity}/A:{availability}" + ) + + c = CVSS3(vector) + scores = c.scores() + severities = c.severities() + + base_score = scores[0] + base_severity = severities[0] + + severity = base_severity.lower() + + except Exception: + import logging + + logging.exception("Failed to calculate CVSS") + return 7.5, "high", "" + else: + return base_score, severity, vector + + +def _validate_required_fields(**kwargs: str | None) -> list[str]: + validation_errors: list[str] = [] + + required_fields = { + "title": "Title cannot be empty", + "description": "Description cannot be empty", + "impact": "Impact cannot be empty", + "target": "Target cannot be empty", + "technical_analysis": "Technical analysis cannot be empty", + "poc_description": "PoC description cannot be empty", + "poc_script_code": "PoC script/code is REQUIRED - provide the actual exploit/payload", + "remediation_steps": "Remediation steps cannot be empty", + } + + for field_name, error_msg in required_fields.items(): + value = kwargs.get(field_name) + if not value or not str(value).strip(): + validation_errors.append(error_msg) + + return validation_errors + + +def _validate_cvss_parameters(**kwargs: str) -> list[str]: + validation_errors: list[str] = [] + + cvss_validations = { + "attack_vector": ["N", "A", "L", "P"], + "attack_complexity": ["L", "H"], + "privileges_required": ["N", "L", "H"], + "user_interaction": ["N", "R"], + "scope": ["U", "C"], + "confidentiality": ["N", "L", "H"], + "integrity": ["N", "L", "H"], + "availability": ["N", "L", "H"], + } + + for param_name, valid_values in cvss_validations.items(): + value = kwargs.get(param_name) + if value not in valid_values: + validation_errors.append( + f"Invalid {param_name}: {value}. Must be one of: {valid_values}" + ) + + return validation_errors + + @register_tool(sandbox_execution=False) def create_vulnerability_report( title: str, - content: str, - severity: str, + description: str, + impact: str, + target: str, + technical_analysis: str, + poc_description: str, + poc_script_code: str, + remediation_steps: str, + # CVSS Breakdown Components + attack_vector: str, + attack_complexity: str, + privileges_required: str, + user_interaction: str, + scope: str, + confidentiality: str, + integrity: str, + availability: str, + # Optional fields + endpoint: str | None = None, + method: str | None = None, + cve: str | None = None, + code_file: str | None = None, + code_before: str | None = None, + code_after: str | None = None, + code_diff: str | None = None, ) -> dict[str, Any]: - validation_error = None - if not title or not title.strip(): - validation_error = "Title cannot be empty" - elif not content or not content.strip(): - validation_error = "Content cannot be empty" - elif not severity or not severity.strip(): - validation_error = "Severity cannot be empty" - else: - valid_severities = ["critical", "high", "medium", "low", "info"] - if severity.lower() not in valid_severities: - validation_error = ( - f"Invalid severity '{severity}'. Must be one of: {', '.join(valid_severities)}" - ) + validation_errors = _validate_required_fields( + title=title, + description=description, + impact=impact, + target=target, + technical_analysis=technical_analysis, + poc_description=poc_description, + poc_script_code=poc_script_code, + remediation_steps=remediation_steps, + ) - if validation_error: - return {"success": False, "message": validation_error} + validation_errors.extend( + _validate_cvss_parameters( + attack_vector=attack_vector, + attack_complexity=attack_complexity, + privileges_required=privileges_required, + user_interaction=user_interaction, + scope=scope, + confidentiality=confidentiality, + integrity=integrity, + availability=availability, + ) + ) + + if validation_errors: + return {"success": False, "message": "Validation failed", "errors": validation_errors} + + cvss_score, severity, cvss_vector = calculate_cvss_and_severity( + attack_vector, + attack_complexity, + privileges_required, + user_interaction, + scope, + confidentiality, + integrity, + availability, + ) try: from strix.telemetry.tracer import get_global_tracer tracer = get_global_tracer() if tracer: + cvss_breakdown = { + "attack_vector": attack_vector, + "attack_complexity": attack_complexity, + "privileges_required": privileges_required, + "user_interaction": user_interaction, + "scope": scope, + "confidentiality": confidentiality, + "integrity": integrity, + "availability": availability, + } + report_id = tracer.add_vulnerability_report( title=title, - content=content, + description=description, severity=severity, + impact=impact, + target=target, + technical_analysis=technical_analysis, + poc_description=poc_description, + poc_script_code=poc_script_code, + remediation_steps=remediation_steps, + cvss=cvss_score, + cvss_breakdown=cvss_breakdown, + endpoint=endpoint, + method=method, + cve=cve, + code_file=code_file, + code_before=code_before, + code_after=code_after, + code_diff=code_diff, ) return { "success": True, "message": f"Vulnerability report '{title}' created successfully", "report_id": report_id, - "severity": severity.lower(), + "severity": severity, + "cvss_score": cvss_score, } + import logging - logging.warning("Global tracer not available - vulnerability report not stored") + logging.warning("Current tracer not available - vulnerability report not stored") - return { # noqa: TRY300 - "success": True, - "message": f"Vulnerability report '{title}' created successfully (not persisted)", - "warning": "Report could not be persisted - tracer unavailable", - } - - except ImportError: + except (ImportError, AttributeError) as e: + return {"success": False, "message": f"Failed to create vulnerability report: {e!s}"} + else: return { "success": True, - "message": f"Vulnerability report '{title}' created successfully (not persisted)", - "warning": "Report could not be persisted - tracer module unavailable", + "message": f"Vulnerability report '{title}' created (not persisted)", + "warning": "Report could not be persisted - tracer unavailable", } - except (ValueError, TypeError) as e: - return {"success": False, "message": f"Failed to create vulnerability report: {e!s}"} diff --git a/strix/tools/reporting/reporting_actions_schema.xml b/strix/tools/reporting/reporting_actions_schema.xml index 2e47d60..8b6b9ae 100644 --- a/strix/tools/reporting/reporting_actions_schema.xml +++ b/strix/tools/reporting/reporting_actions_schema.xml @@ -2,8 +2,7 @@ Create a vulnerability report for a discovered security issue. -Use this tool to document a specific verified security vulnerability. -Put ALL details in the content field - affected URLs, parameters, proof of concept, remediation steps, CVE references, CVSS scores, technical details, impact assessment, etc. +Use this tool to document a specific fully verified security vulnerability. DO NOT USE: - For general security observations without specific vulnerabilities @@ -11,20 +10,119 @@ DO NOT USE: - When you don't have a proof of concept, or still not 100% sure if it's a vulnerability - For tracking multiple vulnerabilities (create separate reports) - For reporting multiple vulnerabilities at once. Use a separate create_vulnerability_report for each vulnerability. + +White-box requirement (when you have access to the code): You MUST include code_file, code_before, code_after, and code_diff. These must contain the actual code (before/after) and a complete, apply-able unified diff. + +Professional, customer-facing report rules (PDF-ready): +- Do NOT include internal or system details: never mention local or absolute paths (e.g., "/workspace"), internal tools, agents, orchestrators, sandboxes, models, system prompts/instructions, connection issues, internal errors/logs/stack traces, or tester machine environment details. +- Tone and style: formal, objective, third-person, vendor-neutral, concise. No runbooks, checklists, or engineering notes. Avoid headings like "QUICK", "Approach", or "Techniques" that read like internal guidance. +- Use a standard penetration testing report structure per finding: + 1) Overview + 2) Severity and CVSS (vector only) + 3) Affected asset(s) + 4) Technical details + 5) Proof of concept (repro steps plus code) + 6) Impact + 7) Remediation + 8) Evidence (optional request/response excerpts, etc.) in the technical analysis field. +- Numbered steps are allowed ONLY within the proof of concept. Elsewhere, use clear, concise paragraphs suitable for customer-facing reports. +- Language must be precise and non-vague; avoid hedging. - Clear, concise title of the vulnerability + Clear, specific title (e.g., "SQL Injection in /api/users Login Parameter"). But not too long. Don't mention CVE number in the title. - - Complete vulnerability details including affected URLs, technical details, impact, proof of concept, remediation steps, and any relevant references. Be comprehensive and include everything relevant. + + Comprehensive description of the vulnerability and how it was discovered - - Severity level: critical, high, medium, low, or info + + Impact assessment: what attacker can do, business risk, data at risk + + + Affected target: URL, domain, or Git repository + + + Technical explanation of the vulnerability mechanism and root cause + + + Step-by-step instructions to reproduce the vulnerability + + + Actual proof of concept code, exploit, payload, or script that demonstrates the vulnerability. Python code. + + + Specific, actionable steps to fix the vulnerability + + + CVSS Attack Vector - How the vulnerability is exploited: +N = Network (remotely exploitable) +A = Adjacent (same network segment) +L = Local (local access required) +P = Physical (physical access required) + + + CVSS Attack Complexity - Conditions beyond attacker's control: +L = Low (no special conditions) +H = High (special conditions must exist) + + + CVSS Privileges Required - Level of privileges needed: +N = None (no privileges needed) +L = Low (basic user privileges) +H = High (admin privileges) + + + CVSS User Interaction - Does exploit require user action: +N = None (no user interaction needed) +R = Required (user must perform some action) + + + CVSS Scope - Can the vulnerability affect resources beyond its security scope: +U = Unchanged (only affects the vulnerable component) +C = Changed (affects resources beyond vulnerable component) + + + CVSS Confidentiality Impact - Impact to confidentiality: +N = None (no impact) +L = Low (some information disclosure) +H = High (all information disclosed) + + + CVSS Integrity Impact - Impact to integrity: +N = None (no impact) +L = Low (data can be modified but scope is limited) +H = High (total loss of integrity) + + + CVSS Availability Impact - Impact to availability: +N = None (no impact) +L = Low (reduced performance or interruptions) +H = High (total loss of availability) + + + API endpoint(s) or URL path(s) (e.g., "/api/login") - for web vulnerabilities, or Git repository path(s) - for code vulnerabilities + + + HTTP method(s) (GET, POST, etc.) - for web vulnerabilities. + + + CVE identifier (e.g., "CVE-2024-1234"). Make sure it's a valid CVE. Use web search or vulnerability databases to make sure it's a valid CVE number. + + + MANDATORY for white-box testing: exact affected source file path(s). + + + MANDATORY for white-box testing: actual vulnerable code snippet(s) copied verbatim from the repository. + + + MANDATORY for white-box testing: corrected code snippet(s) exactly as they should appear after the fix. + + + MANDATORY for white-box testing: unified diff showing the code changes. Must be a complete, apply-able unified diff (git format) covering all affected files, with proper file headers, line numbers, and sufficient context. - Response containing success status and message + Response containing success=true, message, report_id, severity, cvss_score