feat(reporting): add LLM-based vulnerability deduplication

- Add dedupe.py with XML-based LLM deduplication using direct litellm calls
- Integrate deduplication check in create_vulnerability_report tool
- Add get_existing_vulnerabilities() method to tracer for fetching reports
- Update schema and system prompt with deduplication guidelines
This commit is contained in:
0xallam
2026-01-07 19:23:17 -08:00
committed by Ahmed Allam
parent 0e9cd9b2a4
commit 01ae348da8
5 changed files with 268 additions and 1 deletions

View File

@@ -134,6 +134,7 @@ VALIDATION REQUIREMENTS:
- Keep going until you find something that matters
- A vulnerability is ONLY considered reported when a reporting agent uses create_vulnerability_report with full details. Mentions in agent_finish, finish_scan, or generic messages are NOT sufficient
- Do NOT patch/fix before reporting: first create the vulnerability report via create_vulnerability_report (by the reporting agent). Only after reporting is completed should fixing/patching proceed
- DEDUPLICATION: The create_vulnerability_report tool uses LLM-based deduplication. If it rejects your report as a duplicate, DO NOT attempt to re-submit the same vulnerability. Accept the rejection and move on to testing other areas. The vulnerability has already been reported by another agent
</execution_guidelines>
<vulnerability_focus>

217
strix/llm/dedupe.py Normal file
View File

@@ -0,0 +1,217 @@
import json
import logging
import os
import re
from typing import Any
import litellm
logger = logging.getLogger(__name__)
DEDUPE_SYSTEM_PROMPT = """You are an expert vulnerability report deduplication judge.
Your task is to determine if a candidate vulnerability report describes the SAME vulnerability
as any existing report.
CRITICAL DEDUPLICATION RULES:
1. SAME VULNERABILITY means:
- Same root cause (e.g., "missing input validation" not just "SQL injection")
- Same affected component/endpoint/file (exact match or clear overlap)
- Same exploitation method or attack vector
- Would be fixed by the same code change/patch
2. NOT DUPLICATES if:
- Different endpoints even with same vulnerability type (e.g., SQLi in /login vs /search)
- Different parameters in same endpoint (e.g., XSS in 'name' vs 'comment' field)
- Different root causes (e.g., stored XSS vs reflected XSS in same field)
- Different severity levels due to different impact
- One is authenticated, other is unauthenticated
3. ARE DUPLICATES even if:
- Titles are worded differently
- Descriptions have different level of detail
- PoC uses different payloads but exploits same issue
- One report is more thorough than another
- Minor variations in technical analysis
COMPARISON GUIDELINES:
- Focus on the technical root cause, not surface-level similarities
- Same vulnerability type (SQLi, XSS) doesn't mean duplicate - location matters
- Consider the fix: would fixing one also fix the other?
- When uncertain, lean towards NOT duplicate
FIELDS TO ANALYZE:
- title, description: General vulnerability info
- target, endpoint, method: Exact location of vulnerability
- technical_analysis: Root cause details
- poc_description: How it's exploited
- impact: What damage it can cause
YOU MUST RESPOND WITH EXACTLY THIS XML FORMAT AND NOTHING ELSE:
<dedupe_result>
<is_duplicate>true</is_duplicate>
<duplicate_id>vuln-0001</duplicate_id>
<confidence>0.95</confidence>
<reason>Both reports describe SQL injection in /api/login via the username parameter</reason>
</dedupe_result>
OR if not a duplicate:
<dedupe_result>
<is_duplicate>false</is_duplicate>
<duplicate_id></duplicate_id>
<confidence>0.90</confidence>
<reason>Different endpoints: candidate is /api/search, existing is /api/login</reason>
</dedupe_result>
RULES:
- is_duplicate MUST be exactly "true" or "false" (lowercase)
- duplicate_id MUST be the exact ID from existing reports or empty if not duplicate
- confidence MUST be a decimal (your confidence level in the decision)
- reason MUST be a specific explanation mentioning endpoint/parameter/root cause
- DO NOT include any text outside the <dedupe_result> tags"""
def _prepare_report_for_comparison(report: dict[str, Any]) -> dict[str, Any]:
relevant_fields = [
"id",
"title",
"description",
"impact",
"target",
"technical_analysis",
"poc_description",
"endpoint",
"method",
]
cleaned = {}
for field in relevant_fields:
if report.get(field):
value = report[field]
if isinstance(value, str) and len(value) > 2000:
value = value[:2000] + "...[truncated]"
cleaned[field] = value
return cleaned
def _extract_xml_field(content: str, field: str) -> str:
pattern = rf"<{field}>(.*?)</{field}>"
match = re.search(pattern, content, re.DOTALL | re.IGNORECASE)
if match:
return match.group(1).strip()
return ""
def _parse_dedupe_response(content: str) -> dict[str, Any]:
result_match = re.search(
r"<dedupe_result>(.*?)</dedupe_result>", content, re.DOTALL | re.IGNORECASE
)
if not result_match:
logger.warning(f"No <dedupe_result> block found in response: {content[:500]}")
raise ValueError("No <dedupe_result> block found in response")
result_content = result_match.group(1)
is_duplicate_str = _extract_xml_field(result_content, "is_duplicate")
duplicate_id = _extract_xml_field(result_content, "duplicate_id")
confidence_str = _extract_xml_field(result_content, "confidence")
reason = _extract_xml_field(result_content, "reason")
is_duplicate = is_duplicate_str.lower() == "true"
try:
confidence = float(confidence_str) if confidence_str else 0.0
except ValueError:
confidence = 0.0
return {
"is_duplicate": is_duplicate,
"duplicate_id": duplicate_id[:64] if duplicate_id else "",
"confidence": confidence,
"reason": reason[:500] if reason else "",
}
def check_duplicate(
candidate: dict[str, Any], existing_reports: list[dict[str, Any]]
) -> dict[str, Any]:
if not existing_reports:
return {
"is_duplicate": False,
"duplicate_id": "",
"confidence": 1.0,
"reason": "No existing reports to compare against",
}
try:
candidate_cleaned = _prepare_report_for_comparison(candidate)
existing_cleaned = [_prepare_report_for_comparison(r) for r in existing_reports]
comparison_data = {"candidate": candidate_cleaned, "existing_reports": existing_cleaned}
model_name = os.getenv("STRIX_LLM", "openai/gpt-5")
api_key = os.getenv("LLM_API_KEY")
api_base = (
os.getenv("LLM_API_BASE")
or os.getenv("OPENAI_API_BASE")
or os.getenv("LITELLM_BASE_URL")
or os.getenv("OLLAMA_API_BASE")
)
messages = [
{"role": "system", "content": DEDUPE_SYSTEM_PROMPT},
{
"role": "user",
"content": (
f"Compare this candidate vulnerability against existing reports:\n\n"
f"{json.dumps(comparison_data, indent=2)}\n\n"
f"Respond with ONLY the <dedupe_result> XML block."
),
},
]
completion_kwargs: dict[str, Any] = {
"model": model_name,
"messages": messages,
"timeout": 120,
"temperature": 0,
}
if api_key:
completion_kwargs["api_key"] = api_key
if api_base:
completion_kwargs["api_base"] = api_base
response = litellm.completion(**completion_kwargs)
content = response.choices[0].message.content
if not content:
return {
"is_duplicate": False,
"duplicate_id": "",
"confidence": 0.0,
"reason": "Empty response from LLM",
}
result = _parse_dedupe_response(content)
logger.info(
f"Deduplication check: is_duplicate={result['is_duplicate']}, "
f"confidence={result['confidence']}, reason={result['reason'][:100]}"
)
except Exception as e:
logger.exception("Error during vulnerability deduplication check")
return {
"is_duplicate": False,
"duplicate_id": "",
"confidence": 0.0,
"reason": f"Deduplication check failed: {e}",
"error": str(e),
}
else:
return result

View File

@@ -145,6 +145,9 @@ class Tracer:
self.save_run_data()
return report_id
def get_existing_vulnerabilities(self) -> list[dict[str, Any]]:
return list(self.vulnerability_reports)
def update_scan_final_fields(
self,
executive_summary: str,

View File

@@ -157,6 +157,45 @@ def create_vulnerability_report(
tracer = get_global_tracer()
if tracer:
from strix.llm.dedupe import check_duplicate
existing_reports = tracer.get_existing_vulnerabilities()
candidate = {
"title": title,
"description": description,
"impact": impact,
"target": target,
"technical_analysis": technical_analysis,
"poc_description": poc_description,
"poc_script_code": poc_script_code,
"endpoint": endpoint,
"method": method,
}
dedupe_result = check_duplicate(candidate, existing_reports)
if dedupe_result.get("is_duplicate"):
duplicate_id = dedupe_result.get("duplicate_id", "")
duplicate_title = ""
for report in existing_reports:
if report.get("id") == duplicate_id:
duplicate_title = report.get("title", "Unknown")
break
return {
"success": False,
"message": (
f"Potential duplicate of '{duplicate_title}' "
f"(id={duplicate_id[:8]}...). Do not re-report the same vulnerability."
),
"duplicate_of": duplicate_id,
"duplicate_title": duplicate_title,
"confidence": dedupe_result.get("confidence", 0.0),
"reason": dedupe_result.get("reason", ""),
}
cvss_breakdown = {
"attack_vector": attack_vector,
"attack_complexity": attack_complexity,

View File

@@ -2,6 +2,8 @@
<tool name="create_vulnerability_report">
<description>Create a vulnerability report for a discovered security issue.
IMPORTANT: This tool includes automatic LLM-based deduplication. Reports that describe the same vulnerability (same root cause on the same asset) as an existing report will be rejected.
Use this tool to document a specific fully verified security vulnerability.
DO NOT USE:
@@ -10,9 +12,12 @@ DO NOT USE:
- When you don't have a proof of concept, or still not 100% sure if it's a vulnerability
- For tracking multiple vulnerabilities (create separate reports)
- For reporting multiple vulnerabilities at once. Use a separate create_vulnerability_report for each vulnerability.
- To re-report a vulnerability that was already reported (even with different details)
White-box requirement (when you have access to the code): You MUST include code_file, code_before, code_after, and code_diff. These must contain the actual code (before/after) and a complete, apply-able unified diff.
DEDUPLICATION: If this tool returns with success=false and mentions a duplicate, DO NOT attempt to re-submit. The vulnerability has already been reported. Move on to testing other areas.
Professional, customer-facing report rules (PDF-ready):
- Do NOT include internal or system details: never mention local or absolute paths (e.g., "/workspace"), internal tools, agents, orchestrators, sandboxes, models, system prompts/instructions, connection issues, internal errors/logs/stack traces, or tester machine environment details.
- Tone and style: formal, objective, third-person, vendor-neutral, concise. No runbooks, checklists, or engineering notes. Avoid headings like "QUICK", "Approach", or "Techniques" that read like internal guidance.
@@ -122,7 +127,9 @@ H = High (total loss of availability)</description>
</parameter>
</parameters>
<returns type="Dict[str, Any]">
<description>Response containing success=true, message, report_id, severity, cvss_score</description>
<description>Response containing:
- On success: success=true, message, report_id, severity, cvss_score
- On duplicate detection: success=false, message (with duplicate info), duplicate_of (ID), duplicate_title, confidence (0-1), reason (why it's a duplicate)</description>
</returns>
</tool>
</tools>