From 01ae348da8306eb9fc393711be864a14bf49960f Mon Sep 17 00:00:00 2001 From: 0xallam Date: Wed, 7 Jan 2026 19:23:17 -0800 Subject: [PATCH] feat(reporting): add LLM-based vulnerability deduplication - Add dedupe.py with XML-based LLM deduplication using direct litellm calls - Integrate deduplication check in create_vulnerability_report tool - Add get_existing_vulnerabilities() method to tracer for fetching reports - Update schema and system prompt with deduplication guidelines --- strix/agents/StrixAgent/system_prompt.jinja | 1 + strix/llm/dedupe.py | 217 ++++++++++++++++++ strix/telemetry/tracer.py | 3 + strix/tools/reporting/reporting_actions.py | 39 ++++ .../reporting/reporting_actions_schema.xml | 9 +- 5 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 strix/llm/dedupe.py diff --git a/strix/agents/StrixAgent/system_prompt.jinja b/strix/agents/StrixAgent/system_prompt.jinja index 49ee142..99c21b0 100644 --- a/strix/agents/StrixAgent/system_prompt.jinja +++ b/strix/agents/StrixAgent/system_prompt.jinja @@ -134,6 +134,7 @@ VALIDATION REQUIREMENTS: - Keep going until you find something that matters - A vulnerability is ONLY considered reported when a reporting agent uses create_vulnerability_report with full details. Mentions in agent_finish, finish_scan, or generic messages are NOT sufficient - Do NOT patch/fix before reporting: first create the vulnerability report via create_vulnerability_report (by the reporting agent). Only after reporting is completed should fixing/patching proceed +- DEDUPLICATION: The create_vulnerability_report tool uses LLM-based deduplication. If it rejects your report as a duplicate, DO NOT attempt to re-submit the same vulnerability. Accept the rejection and move on to testing other areas. The vulnerability has already been reported by another agent diff --git a/strix/llm/dedupe.py b/strix/llm/dedupe.py new file mode 100644 index 0000000..3e74137 --- /dev/null +++ b/strix/llm/dedupe.py @@ -0,0 +1,217 @@ +import json +import logging +import os +import re +from typing import Any + +import litellm + + +logger = logging.getLogger(__name__) + +DEDUPE_SYSTEM_PROMPT = """You are an expert vulnerability report deduplication judge. +Your task is to determine if a candidate vulnerability report describes the SAME vulnerability +as any existing report. + +CRITICAL DEDUPLICATION RULES: + +1. SAME VULNERABILITY means: + - Same root cause (e.g., "missing input validation" not just "SQL injection") + - Same affected component/endpoint/file (exact match or clear overlap) + - Same exploitation method or attack vector + - Would be fixed by the same code change/patch + +2. NOT DUPLICATES if: + - Different endpoints even with same vulnerability type (e.g., SQLi in /login vs /search) + - Different parameters in same endpoint (e.g., XSS in 'name' vs 'comment' field) + - Different root causes (e.g., stored XSS vs reflected XSS in same field) + - Different severity levels due to different impact + - One is authenticated, other is unauthenticated + +3. ARE DUPLICATES even if: + - Titles are worded differently + - Descriptions have different level of detail + - PoC uses different payloads but exploits same issue + - One report is more thorough than another + - Minor variations in technical analysis + +COMPARISON GUIDELINES: +- Focus on the technical root cause, not surface-level similarities +- Same vulnerability type (SQLi, XSS) doesn't mean duplicate - location matters +- Consider the fix: would fixing one also fix the other? +- When uncertain, lean towards NOT duplicate + +FIELDS TO ANALYZE: +- title, description: General vulnerability info +- target, endpoint, method: Exact location of vulnerability +- technical_analysis: Root cause details +- poc_description: How it's exploited +- impact: What damage it can cause + +YOU MUST RESPOND WITH EXACTLY THIS XML FORMAT AND NOTHING ELSE: + + +true +vuln-0001 +0.95 +Both reports describe SQL injection in /api/login via the username parameter + + +OR if not a duplicate: + + +false + +0.90 +Different endpoints: candidate is /api/search, existing is /api/login + + +RULES: +- is_duplicate MUST be exactly "true" or "false" (lowercase) +- duplicate_id MUST be the exact ID from existing reports or empty if not duplicate +- confidence MUST be a decimal (your confidence level in the decision) +- reason MUST be a specific explanation mentioning endpoint/parameter/root cause +- DO NOT include any text outside the tags""" + + +def _prepare_report_for_comparison(report: dict[str, Any]) -> dict[str, Any]: + relevant_fields = [ + "id", + "title", + "description", + "impact", + "target", + "technical_analysis", + "poc_description", + "endpoint", + "method", + ] + + cleaned = {} + for field in relevant_fields: + if report.get(field): + value = report[field] + if isinstance(value, str) and len(value) > 2000: + value = value[:2000] + "...[truncated]" + cleaned[field] = value + + return cleaned + + +def _extract_xml_field(content: str, field: str) -> str: + pattern = rf"<{field}>(.*?)" + match = re.search(pattern, content, re.DOTALL | re.IGNORECASE) + if match: + return match.group(1).strip() + return "" + + +def _parse_dedupe_response(content: str) -> dict[str, Any]: + result_match = re.search( + r"(.*?)", content, re.DOTALL | re.IGNORECASE + ) + + if not result_match: + logger.warning(f"No block found in response: {content[:500]}") + raise ValueError("No block found in response") + + result_content = result_match.group(1) + + is_duplicate_str = _extract_xml_field(result_content, "is_duplicate") + duplicate_id = _extract_xml_field(result_content, "duplicate_id") + confidence_str = _extract_xml_field(result_content, "confidence") + reason = _extract_xml_field(result_content, "reason") + + is_duplicate = is_duplicate_str.lower() == "true" + + try: + confidence = float(confidence_str) if confidence_str else 0.0 + except ValueError: + confidence = 0.0 + + return { + "is_duplicate": is_duplicate, + "duplicate_id": duplicate_id[:64] if duplicate_id else "", + "confidence": confidence, + "reason": reason[:500] if reason else "", + } + + +def check_duplicate( + candidate: dict[str, Any], existing_reports: list[dict[str, Any]] +) -> dict[str, Any]: + if not existing_reports: + return { + "is_duplicate": False, + "duplicate_id": "", + "confidence": 1.0, + "reason": "No existing reports to compare against", + } + + try: + candidate_cleaned = _prepare_report_for_comparison(candidate) + existing_cleaned = [_prepare_report_for_comparison(r) for r in existing_reports] + + comparison_data = {"candidate": candidate_cleaned, "existing_reports": existing_cleaned} + + model_name = os.getenv("STRIX_LLM", "openai/gpt-5") + api_key = os.getenv("LLM_API_KEY") + api_base = ( + os.getenv("LLM_API_BASE") + or os.getenv("OPENAI_API_BASE") + or os.getenv("LITELLM_BASE_URL") + or os.getenv("OLLAMA_API_BASE") + ) + + messages = [ + {"role": "system", "content": DEDUPE_SYSTEM_PROMPT}, + { + "role": "user", + "content": ( + f"Compare this candidate vulnerability against existing reports:\n\n" + f"{json.dumps(comparison_data, indent=2)}\n\n" + f"Respond with ONLY the XML block." + ), + }, + ] + + completion_kwargs: dict[str, Any] = { + "model": model_name, + "messages": messages, + "timeout": 120, + "temperature": 0, + } + if api_key: + completion_kwargs["api_key"] = api_key + if api_base: + completion_kwargs["api_base"] = api_base + + response = litellm.completion(**completion_kwargs) + + content = response.choices[0].message.content + if not content: + return { + "is_duplicate": False, + "duplicate_id": "", + "confidence": 0.0, + "reason": "Empty response from LLM", + } + + result = _parse_dedupe_response(content) + + logger.info( + f"Deduplication check: is_duplicate={result['is_duplicate']}, " + f"confidence={result['confidence']}, reason={result['reason'][:100]}" + ) + + except Exception as e: + logger.exception("Error during vulnerability deduplication check") + return { + "is_duplicate": False, + "duplicate_id": "", + "confidence": 0.0, + "reason": f"Deduplication check failed: {e}", + "error": str(e), + } + else: + return result diff --git a/strix/telemetry/tracer.py b/strix/telemetry/tracer.py index 8250bb3..26890ad 100644 --- a/strix/telemetry/tracer.py +++ b/strix/telemetry/tracer.py @@ -145,6 +145,9 @@ class Tracer: self.save_run_data() return report_id + def get_existing_vulnerabilities(self) -> list[dict[str, Any]]: + return list(self.vulnerability_reports) + def update_scan_final_fields( self, executive_summary: str, diff --git a/strix/tools/reporting/reporting_actions.py b/strix/tools/reporting/reporting_actions.py index 13fe55d..c307d93 100644 --- a/strix/tools/reporting/reporting_actions.py +++ b/strix/tools/reporting/reporting_actions.py @@ -157,6 +157,45 @@ def create_vulnerability_report( tracer = get_global_tracer() if tracer: + from strix.llm.dedupe import check_duplicate + + existing_reports = tracer.get_existing_vulnerabilities() + + candidate = { + "title": title, + "description": description, + "impact": impact, + "target": target, + "technical_analysis": technical_analysis, + "poc_description": poc_description, + "poc_script_code": poc_script_code, + "endpoint": endpoint, + "method": method, + } + + dedupe_result = check_duplicate(candidate, existing_reports) + + if dedupe_result.get("is_duplicate"): + duplicate_id = dedupe_result.get("duplicate_id", "") + + duplicate_title = "" + for report in existing_reports: + if report.get("id") == duplicate_id: + duplicate_title = report.get("title", "Unknown") + break + + return { + "success": False, + "message": ( + f"Potential duplicate of '{duplicate_title}' " + f"(id={duplicate_id[:8]}...). Do not re-report the same vulnerability." + ), + "duplicate_of": duplicate_id, + "duplicate_title": duplicate_title, + "confidence": dedupe_result.get("confidence", 0.0), + "reason": dedupe_result.get("reason", ""), + } + cvss_breakdown = { "attack_vector": attack_vector, "attack_complexity": attack_complexity, diff --git a/strix/tools/reporting/reporting_actions_schema.xml b/strix/tools/reporting/reporting_actions_schema.xml index 8b6b9ae..a5eee40 100644 --- a/strix/tools/reporting/reporting_actions_schema.xml +++ b/strix/tools/reporting/reporting_actions_schema.xml @@ -2,6 +2,8 @@ Create a vulnerability report for a discovered security issue. +IMPORTANT: This tool includes automatic LLM-based deduplication. Reports that describe the same vulnerability (same root cause on the same asset) as an existing report will be rejected. + Use this tool to document a specific fully verified security vulnerability. DO NOT USE: @@ -10,9 +12,12 @@ DO NOT USE: - When you don't have a proof of concept, or still not 100% sure if it's a vulnerability - For tracking multiple vulnerabilities (create separate reports) - For reporting multiple vulnerabilities at once. Use a separate create_vulnerability_report for each vulnerability. +- To re-report a vulnerability that was already reported (even with different details) White-box requirement (when you have access to the code): You MUST include code_file, code_before, code_after, and code_diff. These must contain the actual code (before/after) and a complete, apply-able unified diff. +DEDUPLICATION: If this tool returns with success=false and mentions a duplicate, DO NOT attempt to re-submit. The vulnerability has already been reported. Move on to testing other areas. + Professional, customer-facing report rules (PDF-ready): - Do NOT include internal or system details: never mention local or absolute paths (e.g., "/workspace"), internal tools, agents, orchestrators, sandboxes, models, system prompts/instructions, connection issues, internal errors/logs/stack traces, or tester machine environment details. - Tone and style: formal, objective, third-person, vendor-neutral, concise. No runbooks, checklists, or engineering notes. Avoid headings like "QUICK", "Approach", or "Techniques" that read like internal guidance. @@ -122,7 +127,9 @@ H = High (total loss of availability) - Response containing success=true, message, report_id, severity, cvss_score + Response containing: +- On success: success=true, message, report_id, severity, cvss_score +- On duplicate detection: success=false, message (with duplicate info), duplicate_of (ID), duplicate_title, confidence (0-1), reason (why it's a duplicate)