feat(reporting): enhance vulnerability reporting with detailed fields and CVSS calculation
This commit is contained in:
@@ -3,61 +3,209 @@ from typing import Any
|
||||
from strix.tools.registry import register_tool
|
||||
|
||||
|
||||
def calculate_cvss_and_severity(
|
||||
attack_vector: str,
|
||||
attack_complexity: str,
|
||||
privileges_required: str,
|
||||
user_interaction: str,
|
||||
scope: str,
|
||||
confidentiality: str,
|
||||
integrity: str,
|
||||
availability: str,
|
||||
) -> tuple[float, str, str]:
|
||||
try:
|
||||
from cvss import CVSS3
|
||||
|
||||
vector = (
|
||||
f"CVSS:3.1/AV:{attack_vector}/AC:{attack_complexity}/"
|
||||
f"PR:{privileges_required}/UI:{user_interaction}/S:{scope}/"
|
||||
f"C:{confidentiality}/I:{integrity}/A:{availability}"
|
||||
)
|
||||
|
||||
c = CVSS3(vector)
|
||||
scores = c.scores()
|
||||
severities = c.severities()
|
||||
|
||||
base_score = scores[0]
|
||||
base_severity = severities[0]
|
||||
|
||||
severity = base_severity.lower()
|
||||
|
||||
except Exception:
|
||||
import logging
|
||||
|
||||
logging.exception("Failed to calculate CVSS")
|
||||
return 7.5, "high", ""
|
||||
else:
|
||||
return base_score, severity, vector
|
||||
|
||||
|
||||
def _validate_required_fields(**kwargs: str | None) -> list[str]:
|
||||
validation_errors: list[str] = []
|
||||
|
||||
required_fields = {
|
||||
"title": "Title cannot be empty",
|
||||
"description": "Description cannot be empty",
|
||||
"impact": "Impact cannot be empty",
|
||||
"target": "Target cannot be empty",
|
||||
"technical_analysis": "Technical analysis cannot be empty",
|
||||
"poc_description": "PoC description cannot be empty",
|
||||
"poc_script_code": "PoC script/code is REQUIRED - provide the actual exploit/payload",
|
||||
"remediation_steps": "Remediation steps cannot be empty",
|
||||
}
|
||||
|
||||
for field_name, error_msg in required_fields.items():
|
||||
value = kwargs.get(field_name)
|
||||
if not value or not str(value).strip():
|
||||
validation_errors.append(error_msg)
|
||||
|
||||
return validation_errors
|
||||
|
||||
|
||||
def _validate_cvss_parameters(**kwargs: str) -> list[str]:
|
||||
validation_errors: list[str] = []
|
||||
|
||||
cvss_validations = {
|
||||
"attack_vector": ["N", "A", "L", "P"],
|
||||
"attack_complexity": ["L", "H"],
|
||||
"privileges_required": ["N", "L", "H"],
|
||||
"user_interaction": ["N", "R"],
|
||||
"scope": ["U", "C"],
|
||||
"confidentiality": ["N", "L", "H"],
|
||||
"integrity": ["N", "L", "H"],
|
||||
"availability": ["N", "L", "H"],
|
||||
}
|
||||
|
||||
for param_name, valid_values in cvss_validations.items():
|
||||
value = kwargs.get(param_name)
|
||||
if value not in valid_values:
|
||||
validation_errors.append(
|
||||
f"Invalid {param_name}: {value}. Must be one of: {valid_values}"
|
||||
)
|
||||
|
||||
return validation_errors
|
||||
|
||||
|
||||
@register_tool(sandbox_execution=False)
|
||||
def create_vulnerability_report(
|
||||
title: str,
|
||||
content: str,
|
||||
severity: str,
|
||||
description: str,
|
||||
impact: str,
|
||||
target: str,
|
||||
technical_analysis: str,
|
||||
poc_description: str,
|
||||
poc_script_code: str,
|
||||
remediation_steps: str,
|
||||
# CVSS Breakdown Components
|
||||
attack_vector: str,
|
||||
attack_complexity: str,
|
||||
privileges_required: str,
|
||||
user_interaction: str,
|
||||
scope: str,
|
||||
confidentiality: str,
|
||||
integrity: str,
|
||||
availability: str,
|
||||
# Optional fields
|
||||
endpoint: str | None = None,
|
||||
method: str | None = None,
|
||||
cve: str | None = None,
|
||||
code_file: str | None = None,
|
||||
code_before: str | None = None,
|
||||
code_after: str | None = None,
|
||||
code_diff: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
validation_error = None
|
||||
if not title or not title.strip():
|
||||
validation_error = "Title cannot be empty"
|
||||
elif not content or not content.strip():
|
||||
validation_error = "Content cannot be empty"
|
||||
elif not severity or not severity.strip():
|
||||
validation_error = "Severity cannot be empty"
|
||||
else:
|
||||
valid_severities = ["critical", "high", "medium", "low", "info"]
|
||||
if severity.lower() not in valid_severities:
|
||||
validation_error = (
|
||||
f"Invalid severity '{severity}'. Must be one of: {', '.join(valid_severities)}"
|
||||
)
|
||||
validation_errors = _validate_required_fields(
|
||||
title=title,
|
||||
description=description,
|
||||
impact=impact,
|
||||
target=target,
|
||||
technical_analysis=technical_analysis,
|
||||
poc_description=poc_description,
|
||||
poc_script_code=poc_script_code,
|
||||
remediation_steps=remediation_steps,
|
||||
)
|
||||
|
||||
if validation_error:
|
||||
return {"success": False, "message": validation_error}
|
||||
validation_errors.extend(
|
||||
_validate_cvss_parameters(
|
||||
attack_vector=attack_vector,
|
||||
attack_complexity=attack_complexity,
|
||||
privileges_required=privileges_required,
|
||||
user_interaction=user_interaction,
|
||||
scope=scope,
|
||||
confidentiality=confidentiality,
|
||||
integrity=integrity,
|
||||
availability=availability,
|
||||
)
|
||||
)
|
||||
|
||||
if validation_errors:
|
||||
return {"success": False, "message": "Validation failed", "errors": validation_errors}
|
||||
|
||||
cvss_score, severity, cvss_vector = calculate_cvss_and_severity(
|
||||
attack_vector,
|
||||
attack_complexity,
|
||||
privileges_required,
|
||||
user_interaction,
|
||||
scope,
|
||||
confidentiality,
|
||||
integrity,
|
||||
availability,
|
||||
)
|
||||
|
||||
try:
|
||||
from strix.telemetry.tracer import get_global_tracer
|
||||
|
||||
tracer = get_global_tracer()
|
||||
if tracer:
|
||||
cvss_breakdown = {
|
||||
"attack_vector": attack_vector,
|
||||
"attack_complexity": attack_complexity,
|
||||
"privileges_required": privileges_required,
|
||||
"user_interaction": user_interaction,
|
||||
"scope": scope,
|
||||
"confidentiality": confidentiality,
|
||||
"integrity": integrity,
|
||||
"availability": availability,
|
||||
}
|
||||
|
||||
report_id = tracer.add_vulnerability_report(
|
||||
title=title,
|
||||
content=content,
|
||||
description=description,
|
||||
severity=severity,
|
||||
impact=impact,
|
||||
target=target,
|
||||
technical_analysis=technical_analysis,
|
||||
poc_description=poc_description,
|
||||
poc_script_code=poc_script_code,
|
||||
remediation_steps=remediation_steps,
|
||||
cvss=cvss_score,
|
||||
cvss_breakdown=cvss_breakdown,
|
||||
endpoint=endpoint,
|
||||
method=method,
|
||||
cve=cve,
|
||||
code_file=code_file,
|
||||
code_before=code_before,
|
||||
code_after=code_after,
|
||||
code_diff=code_diff,
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Vulnerability report '{title}' created successfully",
|
||||
"report_id": report_id,
|
||||
"severity": severity.lower(),
|
||||
"severity": severity,
|
||||
"cvss_score": cvss_score,
|
||||
}
|
||||
|
||||
import logging
|
||||
|
||||
logging.warning("Global tracer not available - vulnerability report not stored")
|
||||
logging.warning("Current tracer not available - vulnerability report not stored")
|
||||
|
||||
return { # noqa: TRY300
|
||||
"success": True,
|
||||
"message": f"Vulnerability report '{title}' created successfully (not persisted)",
|
||||
"warning": "Report could not be persisted - tracer unavailable",
|
||||
}
|
||||
|
||||
except ImportError:
|
||||
except (ImportError, AttributeError) as e:
|
||||
return {"success": False, "message": f"Failed to create vulnerability report: {e!s}"}
|
||||
else:
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Vulnerability report '{title}' created successfully (not persisted)",
|
||||
"warning": "Report could not be persisted - tracer module unavailable",
|
||||
"message": f"Vulnerability report '{title}' created (not persisted)",
|
||||
"warning": "Report could not be persisted - tracer unavailable",
|
||||
}
|
||||
except (ValueError, TypeError) as e:
|
||||
return {"success": False, "message": f"Failed to create vulnerability report: {e!s}"}
|
||||
|
||||
Reference in New Issue
Block a user