diff --git a/containers/Dockerfile b/containers/Dockerfile index 0ece11e..b40c8dc 100644 --- a/containers/Dockerfile +++ b/containers/Dockerfile @@ -101,6 +101,33 @@ RUN npm install -g retire@latest && \ npm install -g @ast-grep/cli@latest && \ npm install -g tree-sitter-cli@latest +RUN set -eux; \ + TS_PARSER_DIR="/home/pentester/.tree-sitter/parsers"; \ + mkdir -p "${TS_PARSER_DIR}"; \ + for repo in tree-sitter-java tree-sitter-javascript tree-sitter-python tree-sitter-go tree-sitter-bash tree-sitter-json tree-sitter-yaml tree-sitter-typescript; do \ + if [ "$repo" = "tree-sitter-yaml" ]; then \ + repo_url="https://github.com/tree-sitter-grammars/${repo}.git"; \ + else \ + repo_url="https://github.com/tree-sitter/${repo}.git"; \ + fi; \ + if [ ! -d "${TS_PARSER_DIR}/${repo}" ]; then \ + git clone --depth 1 "${repo_url}" "${TS_PARSER_DIR}/${repo}"; \ + fi; \ + done; \ + if [ -d "${TS_PARSER_DIR}/tree-sitter-typescript/typescript" ]; then \ + ln -sfn "${TS_PARSER_DIR}/tree-sitter-typescript/typescript" "${TS_PARSER_DIR}/tree-sitter-typescript-typescript"; \ + fi; \ + if [ -d "${TS_PARSER_DIR}/tree-sitter-typescript/tsx" ]; then \ + ln -sfn "${TS_PARSER_DIR}/tree-sitter-typescript/tsx" "${TS_PARSER_DIR}/tree-sitter-typescript-tsx"; \ + fi; \ + tree-sitter init-config >/dev/null 2>&1 || true; \ + TS_CONFIG="/home/pentester/.config/tree-sitter/config.json"; \ + mkdir -p "$(dirname "${TS_CONFIG}")"; \ + [ -f "${TS_CONFIG}" ] || printf '{}\n' > "${TS_CONFIG}"; \ + TMP_CFG="$(mktemp)"; \ + jq --arg p "${TS_PARSER_DIR}" '.["parser-directories"] = ((.["parser-directories"] // []) + [$p] | unique)' "${TS_CONFIG}" > "${TMP_CFG}"; \ + mv "${TMP_CFG}" "${TS_CONFIG}" + WORKDIR /home/pentester/tools RUN git clone https://github.com/aravind0x7/JS-Snooper.git && \ chmod +x JS-Snooper/js_snooper.sh && \ @@ -112,7 +139,18 @@ RUN git clone https://github.com/aravind0x7/JS-Snooper.git && \ USER root RUN curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin -RUN curl -sSfL https://raw.githubusercontent.com/gitleaks/gitleaks/master/install.sh | sh -s -- -b /usr/local/bin +RUN set -eux; \ + ARCH="$(uname -m)"; \ + case "$ARCH" in \ + x86_64) GITLEAKS_ARCH="x64" ;; \ + aarch64|arm64) GITLEAKS_ARCH="arm64" ;; \ + *) echo "Unsupported architecture: $ARCH" >&2; exit 1 ;; \ + esac; \ + TAG="$(curl -fsSL https://api.github.com/repos/gitleaks/gitleaks/releases/latest | jq -r .tag_name)"; \ + curl -fsSL "https://github.com/gitleaks/gitleaks/releases/download/${TAG}/gitleaks_${TAG#v}_linux_${GITLEAKS_ARCH}.tar.gz" -o /tmp/gitleaks.tgz; \ + tar -xzf /tmp/gitleaks.tgz -C /tmp; \ + install -m 0755 /tmp/gitleaks /usr/local/bin/gitleaks; \ + rm -f /tmp/gitleaks /tmp/gitleaks.tgz RUN apt-get update && apt-get install -y zaproxy diff --git a/docs/tools/sandbox.mdx b/docs/tools/sandbox.mdx index 3c815ec..386452f 100644 --- a/docs/tools/sandbox.mdx +++ b/docs/tools/sandbox.mdx @@ -51,7 +51,7 @@ Strix runs inside a Kali Linux-based Docker container with a comprehensive set o | ------------------------------------------------------- | --------------------------------------------- | | [Semgrep](https://github.com/semgrep/semgrep) | Fast SAST and custom rule matching | | [ast-grep](https://ast-grep.github.io) | Structural AST/CST-aware code search (`sg`) | -| [Tree-sitter](https://tree-sitter.github.io/tree-sitter/) | Syntax tree parsing and symbol extraction | +| [Tree-sitter](https://tree-sitter.github.io/tree-sitter/) | Syntax tree parsing and symbol extraction (Java/JS/TS/Python/Go/Bash/JSON/YAML grammars pre-configured) | | [Bandit](https://bandit.readthedocs.io) | Python security linter | ## Secret Detection diff --git a/strix/agents/StrixAgent/system_prompt.jinja b/strix/agents/StrixAgent/system_prompt.jinja index 2097aae..b620921 100644 --- a/strix/agents/StrixAgent/system_prompt.jinja +++ b/strix/agents/StrixAgent/system_prompt.jinja @@ -91,7 +91,11 @@ BLACK-BOX TESTING (domain/subdomain only): WHITE-BOX TESTING (code provided): - MUST perform BOTH static AND dynamic analysis - Static: Use source-aware triage first to map risk quickly (`semgrep`, `ast-grep`, Tree-sitter tooling, `gitleaks`, `trufflehog`, `trivy fs`). Then review code for vulnerabilities -- Shared memory: Use notes as shared working memory; check existing `wiki` notes first (`list_notes`), then update one repo wiki note instead of creating duplicates +- Static coverage floor: execute at least one structural AST mapping pass (`sg` and/or Tree-sitter) per repository and keep artifact output +- Static coverage target per repository: run one `semgrep` pass, one secrets pass (`gitleaks` and/or `trufflehog`), one `trivy fs` pass, and one AST-structural pass (`sg` and/or Tree-sitter); if any are skipped, record why in the shared wiki +- Keep AST artifacts bounded and high-signal: scope to relevant paths/hypotheses, avoid whole-repo generic function dumps +- Shared memory: Use notes as shared working memory; discover wiki notes with `list_notes`, then read the selected one via `get_note(note_id=...)` before analysis +- Before `agent_finish`/`finish_scan`, update the shared repo wiki with scanner summaries, key routes/sinks, and dynamic follow-up plan - Dynamic: Run the application and test live to validate exploitability - NEVER rely solely on static code analysis when dynamic validation is possible - Begin with fast source triage and dynamic run preparation in parallel; use static findings to prioritize live testing. diff --git a/strix/interface/tool_components/notes_renderer.py b/strix/interface/tool_components/notes_renderer.py index f4fc1a3..4a410c2 100644 --- a/strix/interface/tool_components/notes_renderer.py +++ b/strix/interface/tool_components/notes_renderer.py @@ -117,6 +117,8 @@ class ListNotesRenderer(BaseToolRenderer): title = note.get("title", "").strip() or "(untitled)" category = note.get("category", "general") note_content = note.get("content", "").strip() + if not note_content: + note_content = note.get("content_preview", "").strip() text.append("\n - ") text.append(title) @@ -131,3 +133,35 @@ class ListNotesRenderer(BaseToolRenderer): css_classes = cls.get_css_classes("completed") return Static(text, classes=css_classes) + + +@register_tool_renderer +class GetNoteRenderer(BaseToolRenderer): + tool_name: ClassVar[str] = "get_note" + css_classes: ClassVar[list[str]] = ["tool-call", "notes-tool"] + + @classmethod + def render(cls, tool_data: dict[str, Any]) -> Static: + result = tool_data.get("result") + + text = Text() + text.append("◇ ", style="#fbbf24") + text.append("note read", style="dim") + + if result and isinstance(result, dict) and result.get("success"): + note = result.get("note", {}) or {} + title = str(note.get("title", "")).strip() or "(untitled)" + category = note.get("category", "general") + content = str(note.get("content", "")).strip() + text.append("\n ") + text.append(title) + text.append(f" ({category})", style="dim") + if content: + text.append("\n ") + text.append(content, style="dim") + else: + text.append("\n ") + text.append("Loading...", style="dim") + + css_classes = cls.get_css_classes("completed") + return Static(text, classes=css_classes) diff --git a/strix/llm/llm.py b/strix/llm/llm.py index 6387f6e..6eee60e 100644 --- a/strix/llm/llm.py +++ b/strix/llm/llm.py @@ -107,6 +107,7 @@ class LLM: ordered_skills.append(f"scan_modes/{self.config.scan_mode}") if self.config.is_whitebox: ordered_skills.append("coordination/source_aware_whitebox") + ordered_skills.append("source_aware_sast") deduped: list[str] = [] seen: set[str] = set() diff --git a/strix/skills/coordination/source_aware_whitebox.md b/strix/skills/coordination/source_aware_whitebox.md index c4943cc..037b180 100644 --- a/strix/skills/coordination/source_aware_whitebox.md +++ b/strix/skills/coordination/source_aware_whitebox.md @@ -9,11 +9,11 @@ Use this coordination playbook when repository source code is available. ## Objective -Increase white-box coverage by combining source-aware triage with dynamic validation. Source-aware tooling is recommended by default, but not mandatory when context suggests a better path. +Increase white-box coverage by combining source-aware triage with dynamic validation. Source-aware tooling is expected by default when source is available. ## Recommended Workflow -1. Build a quick source map before deep exploitation. +1. Build a quick source map before deep exploitation, including at least one AST-structural pass (`sg` or `tree-sitter`) scoped to relevant paths. 2. Run first-pass static triage to rank high-risk paths. 3. Use triage outputs to prioritize dynamic PoC validation. 4. Keep findings evidence-driven: no report without validation. @@ -27,6 +27,13 @@ Increase white-box coverage by combining source-aware triage with dynamic valida - `gitleaks` + `trufflehog`: complementary secret detection (working tree and history coverage) - `trivy fs`: dependency, misconfiguration, license, and secret checks +Coverage target per repository: +- one `semgrep` pass +- one AST structural pass (`sg` and/or `tree-sitter`) +- one secrets pass (`gitleaks` and/or `trufflehog`) +- one `trivy fs` pass +- if any part is skipped, log the reason in the shared wiki note + ## Agent Delegation Guidance - Keep child agents specialized by vulnerability/component as usual. @@ -38,10 +45,11 @@ Increase white-box coverage by combining source-aware triage with dynamic valida When source is present, maintain one wiki note per repository and keep it current. Operational rules: -- At task start, call `list_notes` with `category=wiki` and reuse existing repo wiki if present. +- At task start, call `list_notes` with `category=wiki`, then read the selected wiki with `get_note(note_id=...)`. - If no repo wiki exists, create one with `create_note` and `category=wiki`. - Update the same wiki via `update_note`; avoid creating duplicate wiki notes for the same repo. -- Child agents should read wiki notes first, then extend with new evidence from their scope. +- Child agents should read wiki notes first via `get_note`, then extend with new evidence from their scope. +- Before calling `agent_finish`, each source-focused child agent should append a short delta update to the shared repo wiki (scanner outputs, route/sink map deltas, dynamic follow-ups). Recommended sections: - Architecture overview diff --git a/strix/skills/custom/source_aware_sast.md b/strix/skills/custom/source_aware_sast.md index dc9d058..9fcaf3b 100644 --- a/strix/skills/custom/source_aware_sast.md +++ b/strix/skills/custom/source_aware_sast.md @@ -19,16 +19,42 @@ Before scanning, check shared wiki memory: ```text 1) list_notes(category="wiki") -2) Reuse matching repo wiki note if present -3) create_note(category="wiki") only if missing +2) get_note(note_id=...) for the selected repo wiki before analysis +3) Reuse matching repo wiki note if present +4) create_note(category="wiki") only if missing ``` +After every major source-analysis batch, update the same repo wiki note with `update_note` so other agents can reuse your latest map. + +## Baseline Coverage Bundle (Recommended) + +Run this baseline once per repository before deep narrowing: + +```bash +ART=/workspace/.strix-source-aware +mkdir -p "$ART" + +semgrep scan --config p/default --config p/golang --config p/secrets \ + --metrics=off --json --output "$ART/semgrep.json" . +sg scan --json . > "$ART/ast-grep.json" +gitleaks detect --source . --report-format json --report-path "$ART/gitleaks.json" || true +trufflehog filesystem --no-update --json --no-verification . > "$ART/trufflehog.json" || true +trivy fs --format json --output "$ART/trivy-fs.json" . +``` + +If one tool is skipped or fails, record that in the shared wiki note along with the reason. + ## Semgrep First Pass Use Semgrep as the default static triage pass: ```bash -semgrep --config auto --json --output /workspace/.strix-source-aware/semgrep.json . +# Preferred deterministic profile set (works with --metrics=off) +semgrep scan --config p/default --config p/golang --config p/secrets \ + --metrics=off --json --output /workspace/.strix-source-aware/semgrep.json . + +# If you choose auto config, do not combine it with --metrics=off +semgrep scan --config auto --json --output /workspace/.strix-source-aware/semgrep-auto.json . ``` If diff scope is active, restrict to changed files first, then expand only when needed. @@ -92,6 +118,11 @@ Keep one wiki note per repository and update these sections: ## Dynamic Validation Follow-Ups ``` +Before `agent_finish`, make one final `update_note` call to capture: +- scanner artifacts and paths +- top validated/invalidated hypotheses +- concrete dynamic follow-up tasks + ## Anti-Patterns - Do not treat scanner output as final truth. diff --git a/strix/skills/scan_modes/deep.md b/strix/skills/scan_modes/deep.md index f6fe11b..a2687fe 100644 --- a/strix/skills/scan_modes/deep.md +++ b/strix/skills/scan_modes/deep.md @@ -15,8 +15,10 @@ Thorough understanding before exploitation. Test every parameter, every endpoint **Whitebox (source available)** - Map every file, module, and code path in the repository -- Load and maintain shared `wiki` notes from the start (`list_notes(category="wiki")`), then continuously update one repo note +- Load and maintain shared `wiki` notes from the start (`list_notes(category="wiki")` then `get_note(note_id=...)`), then continuously update one repo note - Start with broad source-aware triage (`semgrep`, `ast-grep`, `gitleaks`, `trufflehog`, `trivy fs`) and use outputs to drive deep review +- Execute at least one structural AST pass (`sg` and/or Tree-sitter) per repository and store artifacts for reuse +- Keep AST artifacts bounded and query-driven (target relevant paths/sinks first; avoid whole-repo generic function dumps) - Use syntax-aware parsing (Tree-sitter tooling) to improve symbol, route, and sink extraction quality - Trace all entry points from HTTP handlers to database queries - Document all authentication mechanisms and implementations @@ -29,6 +31,7 @@ Thorough understanding before exploitation. Test every parameter, every endpoint - Review file handling: upload, download, processing - Understand the deployment model and infrastructure assumptions - Check all dependency versions and repository risks against CVE/misconfiguration data +- Before final completion, update the shared repo wiki with scanner summary + dynamic follow-ups **Blackbox (no source)** - Exhaustive subdomain enumeration with multiple sources and tools diff --git a/strix/skills/scan_modes/quick.md b/strix/skills/scan_modes/quick.md index 14cf860..7e8f36f 100644 --- a/strix/skills/scan_modes/quick.md +++ b/strix/skills/scan_modes/quick.md @@ -15,12 +15,15 @@ Optimize for fast feedback on critical security issues. Skip exhaustive enumerat **Whitebox (source available)** - Focus on recent changes: git diffs, new commits, modified files—these are most likely to contain fresh bugs -- Read existing `wiki` notes first (`list_notes(category="wiki")`) to avoid remapping from scratch +- Read existing `wiki` notes first (`list_notes(category="wiki")` then `get_note(note_id=...)`) to avoid remapping from scratch - Run a fast static triage on changed files first (`semgrep`, then targeted `sg` queries) +- Run at least one lightweight AST pass (`sg` or Tree-sitter) so structural mapping is not skipped +- Keep AST commands tightly scoped to changed or high-risk paths; avoid broad repository-wide pattern dumps - Run quick secret and dependency checks (`gitleaks`, `trufflehog`, `trivy fs`) scoped to changed areas when possible - Identify security-sensitive patterns in changed code: auth checks, input handling, database queries, file operations - Trace user input through modified code paths - Check if security controls were modified or bypassed +- Before completion, update the shared repo wiki with what changed and what needs dynamic follow-up **Blackbox (no source)** - Map authentication and critical user flows diff --git a/strix/skills/scan_modes/standard.md b/strix/skills/scan_modes/standard.md index 773dab1..13f3f70 100644 --- a/strix/skills/scan_modes/standard.md +++ b/strix/skills/scan_modes/standard.md @@ -15,15 +15,17 @@ Systematic testing across the full attack surface. Understand the application be **Whitebox (source available)** - Map codebase structure: modules, entry points, routing -- Start by loading existing `wiki` notes (`list_notes(category="wiki")`) and update one shared repo note as mapping evolves +- Start by loading existing `wiki` notes (`list_notes(category="wiki")` then `get_note(note_id=...)`) and update one shared repo note as mapping evolves - Run `semgrep` first-pass triage to prioritize risky flows before deep manual review -- Use `ast-grep`/Tree-sitter-assisted structural search for route, sink, and trust-boundary mapping +- Run at least one AST-structural mapping pass (`sg` and/or Tree-sitter), then use outputs for route, sink, and trust-boundary mapping +- Keep AST output bounded to relevant paths and hypotheses; avoid whole-repo generic function dumps - Identify architecture pattern (MVC, microservices, monolith) - Trace input vectors: forms, APIs, file uploads, headers, cookies - Review authentication and authorization flows - Analyze database interactions and ORM usage - Check dependencies and repo risks with `trivy fs`, `gitleaks`, and `trufflehog` - Understand the data model and sensitive data locations +- Before completion, update the shared repo wiki with source findings summary and dynamic validation next steps **Blackbox (no source)** - Crawl application thoroughly, interact with every feature diff --git a/strix/tools/agents_graph/agents_graph_actions.py b/strix/tools/agents_graph/agents_graph_actions.py index c8a98e7..cfe6ed0 100644 --- a/strix/tools/agents_graph/agents_graph_actions.py +++ b/strix/tools/agents_graph/agents_graph_actions.py @@ -1,5 +1,6 @@ import threading from datetime import UTC, datetime +import re from typing import Any, Literal from strix.tools.registry import register_tool @@ -21,6 +22,142 @@ _agent_instances: dict[str, Any] = {} _agent_states: dict[str, Any] = {} +def _is_whitebox_agent(agent_id: str) -> bool: + agent = _agent_instances.get(agent_id) + return bool(getattr(getattr(agent, "llm_config", None), "is_whitebox", False)) + + +def _extract_repo_tags(agent_state: Any | None) -> set[str]: + repo_tags: set[str] = set() + if agent_state is None: + return repo_tags + + task_text = str(getattr(agent_state, "task", "") or "") + for workspace_subdir in re.findall(r"/workspace/([A-Za-z0-9._-]+)", task_text): + repo_tags.add(f"repo:{workspace_subdir.lower()}") + + for repo_name in re.findall(r"github\.com/[^/\s]+/([A-Za-z0-9._-]+)", task_text): + normalized = repo_name.removesuffix(".git").lower() + if normalized: + repo_tags.add(f"repo:{normalized}") + + return repo_tags + + +def _load_primary_wiki_note(agent_state: Any | None = None) -> dict[str, Any] | None: + try: + from strix.tools.notes.notes_actions import get_note, list_notes + + notes_result = list_notes(category="wiki") + if not notes_result.get("success"): + return None + + notes = notes_result.get("notes") or [] + if not notes: + return None + + selected_note_id = None + repo_tags = _extract_repo_tags(agent_state) + if repo_tags: + for note in notes: + note_tags = note.get("tags") or [] + if not isinstance(note_tags, list): + continue + normalized_note_tags = {str(tag).strip().lower() for tag in note_tags if str(tag).strip()} + if normalized_note_tags.intersection(repo_tags): + selected_note_id = note.get("note_id") + break + + note_id = selected_note_id or notes[0].get("note_id") + if not isinstance(note_id, str) or not note_id: + return None + + note_result = get_note(note_id=note_id) + if not note_result.get("success"): + return None + + note = note_result.get("note") + if not isinstance(note, dict): + return None + + except Exception: + return None + else: + return note + + +def _inject_wiki_context_for_whitebox(agent_state: Any) -> None: + if not _is_whitebox_agent(agent_state.agent_id): + return + + wiki_note = _load_primary_wiki_note(agent_state) + if not wiki_note: + return + + title = str(wiki_note.get("title") or "repo wiki") + content = str(wiki_note.get("content") or "").strip() + if not content: + return + + max_chars = 4000 + truncated_content = content[:max_chars] + suffix = "\n\n[truncated for context size]" if len(content) > max_chars else "" + agent_state.add_message( + "user", + ( + f"\n" + f"{truncated_content}{suffix}\n" + "" + ), + ) + + +def _append_wiki_update_on_finish( + agent_state: Any, + agent_name: str, + result_summary: str, + findings: list[str] | None, + final_recommendations: list[str] | None, +) -> None: + if not _is_whitebox_agent(agent_state.agent_id): + return + + try: + from strix.tools.notes.notes_actions import update_note + + note = _load_primary_wiki_note(agent_state) + if not note: + return + + note_id = note.get("note_id") + if not isinstance(note_id, str) or not note_id: + return + + existing_content = str(note.get("content") or "") + timestamp = datetime.now(UTC).isoformat() + summary = " ".join(str(result_summary).split()) + if len(summary) > 1200: + summary = f"{summary[:1197]}..." + findings_lines = "\n".join(f"- {item}" for item in (findings or [])) or "- none" + recommendation_lines = ( + "\n".join(f"- {item}" for item in (final_recommendations or [])) or "- none" + ) + + delta = ( + f"\n\n## Agent Update: {agent_name} ({timestamp})\n" + f"Summary: {summary}\n\n" + "Findings:\n" + f"{findings_lines}\n\n" + "Recommendations:\n" + f"{recommendation_lines}\n" + ) + updated_content = f"{existing_content.rstrip()}{delta}" + update_note(note_id=note_id, content=updated_content) + except Exception: + # Best-effort update; never block agent completion on note persistence. + return + + def _run_agent_in_thread( agent: Any, state: Any, inherited_messages: list[dict[str, Any]] ) -> dict[str, Any]: @@ -31,6 +168,8 @@ def _run_agent_in_thread( state.add_message(msg["role"], msg["content"]) state.add_message("user", "") + _inject_wiki_context_for_whitebox(state) + parent_info = _agent_graph["nodes"].get(state.parent_id, {}) parent_name = parent_info.get("name", "Unknown Parent") @@ -42,9 +181,14 @@ def _run_agent_in_thread( wiki_memory_instruction = "" if getattr(getattr(agent, "llm_config", None), "is_whitebox", False): wiki_memory_instruction = ( - '\n - White-box memory: call list_notes(category="wiki") early, ' - "reuse existing repo wiki notes, and update the same note instead of " - "creating duplicates" + '\n - White-box memory (recommended): call list_notes(category="wiki") and then ' + "get_note(note_id=...) before substantive work (including terminal scans)" + "\n - Reuse one repo wiki note where possible and avoid duplicates" + "\n - Before agent_finish, call list_notes(category=\"wiki\") + get_note(note_id=...) again, then append a short scope delta via update_note (new routes/sinks, scanner results, dynamic follow-ups)" + "\n - If terminal output contains `command not found` or shell parse errors, correct and rerun before using the result" + "\n - Use ASCII-only shell commands; if a command includes unexpected non-ASCII characters, rerun with a clean ASCII command" + "\n - Keep AST artifacts bounded: target relevant paths and avoid whole-repo generic function dumps" + "\n - Source-aware tooling is advisory: choose semgrep/AST/tree-sitter/gitleaks/trivy when relevant, do not force static steps for purely dynamic validation tasks" ) task_xml = f""" @@ -232,8 +376,23 @@ def create_agent( if hasattr(parent_agent.llm_config, "is_whitebox"): is_whitebox = parent_agent.llm_config.is_whitebox interactive = getattr(parent_agent.llm_config, "interactive", False) - if hasattr(parent_agent.llm_config, "is_whitebox"): - is_whitebox = parent_agent.llm_config.is_whitebox + + if is_whitebox: + whitebox_guidance = ( + "\n\nWhite-box execution guidance (recommended when source is available):\n" + "- Use structural AST mapping (`sg` or `tree-sitter`) where it helps source analysis; " + "keep artifacts bounded and skip forced AST steps for purely dynamic validation tasks.\n" + "- Keep AST output bounded: scope to relevant paths/files, avoid whole-repo " + "generic function patterns, and cap artifact size.\n" + '- Use shared wiki memory by calling list_notes(category="wiki") then ' + "get_note(note_id=...).\n" + '- Before agent_finish, call list_notes(category="wiki") + get_note(note_id=...) ' + "again, reuse one repo wiki, and call update_note.\n" + "- If terminal output contains `command not found` or shell parse errors, " + "correct and rerun before using the result." + ) + if "White-box execution guidance (recommended when source is available):" not in task: + task = f"{task.rstrip()}{whitebox_guidance}" state = AgentState( task=task, @@ -395,6 +554,14 @@ def agent_finish( "recommendations": final_recommendations or [], } + _append_wiki_update_on_finish( + agent_state=agent_state, + agent_name=agent_node["name"], + result_summary=result_summary, + findings=findings, + final_recommendations=final_recommendations, + ) + parent_notified = False if report_to_parent and agent_node["parent_id"]: diff --git a/strix/tools/notes/__init__.py b/strix/tools/notes/__init__.py index ebcbbca..8d14123 100644 --- a/strix/tools/notes/__init__.py +++ b/strix/tools/notes/__init__.py @@ -1,6 +1,7 @@ from .notes_actions import ( create_note, delete_note, + get_note, list_notes, update_note, ) @@ -9,6 +10,7 @@ from .notes_actions import ( __all__ = [ "create_note", "delete_note", + "get_note", "list_notes", "update_note", ] diff --git a/strix/tools/notes/notes_actions.py b/strix/tools/notes/notes_actions.py index 6364dd7..2622cc2 100644 --- a/strix/tools/notes/notes_actions.py +++ b/strix/tools/notes/notes_actions.py @@ -12,6 +12,7 @@ _notes_storage: dict[str, dict[str, Any]] = {} _VALID_NOTE_CATEGORIES = ["general", "findings", "methodology", "questions", "plan", "wiki"] _notes_lock = threading.RLock() _loaded_notes_run_dir: str | None = None +_DEFAULT_CONTENT_PREVIEW_CHARS = 280 def _get_run_dir() -> Path | None: @@ -204,6 +205,38 @@ def _filter_notes( return filtered_notes +def _to_note_listing_entry( + note: dict[str, Any], + *, + include_content: bool = False, +) -> dict[str, Any]: + entry = { + "note_id": note.get("note_id"), + "title": note.get("title", ""), + "category": note.get("category", "general"), + "tags": note.get("tags", []), + "created_at": note.get("created_at", ""), + "updated_at": note.get("updated_at", ""), + } + + wiki_filename = note.get("wiki_filename") + if isinstance(wiki_filename, str) and wiki_filename: + entry["wiki_filename"] = wiki_filename + + content = str(note.get("content", "")) + if include_content: + entry["content"] = content + elif content: + if len(content) > _DEFAULT_CONTENT_PREVIEW_CHARS: + entry["content_preview"] = ( + f"{content[:_DEFAULT_CONTENT_PREVIEW_CHARS].rstrip()}..." + ) + else: + entry["content_preview"] = content + + return entry + + @register_tool(sandbox_execution=False) def create_note( # noqa: PLR0911 title: str, @@ -272,15 +305,20 @@ def list_notes( category: str | None = None, tags: list[str] | None = None, search: str | None = None, + include_content: bool = False, ) -> dict[str, Any]: with _notes_lock: try: filtered_notes = _filter_notes(category=category, tags=tags, search_query=search) + notes = [ + _to_note_listing_entry(note, include_content=include_content) + for note in filtered_notes + ] return { "success": True, - "notes": filtered_notes, - "total_count": len(filtered_notes), + "notes": notes, + "total_count": len(notes), } except (ValueError, TypeError) as e: @@ -292,6 +330,40 @@ def list_notes( } +@register_tool(sandbox_execution=False) +def get_note(note_id: str) -> dict[str, Any]: + with _notes_lock: + try: + _ensure_notes_loaded() + + if not note_id or not note_id.strip(): + return { + "success": False, + "error": "Note ID cannot be empty", + "note": None, + } + + note = _notes_storage.get(note_id) + if note is None: + return { + "success": False, + "error": f"Note with ID '{note_id}' not found", + "note": None, + } + + note_with_id = note.copy() + note_with_id["note_id"] = note_id + + except (ValueError, TypeError) as e: + return { + "success": False, + "error": f"Failed to get note: {e}", + "note": None, + } + else: + return {"success": True, "note": note_with_id} + + @register_tool(sandbox_execution=False) def update_note( note_id: str, diff --git a/strix/tools/notes/notes_actions_schema.xml b/strix/tools/notes/notes_actions_schema.xml index 0329187..3b186a5 100644 --- a/strix/tools/notes/notes_actions_schema.xml +++ b/strix/tools/notes/notes_actions_schema.xml @@ -93,7 +93,7 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic - List existing notes with optional filtering and search. + List existing notes with optional filtering and search (metadata-first by default). Filter by category @@ -104,9 +104,12 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic Search query to find in note titles and content + + Include full note content in each list item (default: false) + - Response containing: - notes: List of matching notes - total_count: Total number of notes found + Response containing: - notes: List of matching notes (metadata + optional content/content_preview) - total_count: Total number of notes found # List all findings @@ -131,6 +134,23 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic + + Get a single note by ID, including full content. + + + ID of the note to fetch + + + + Response containing: - note: Note object including content - success: Whether note lookup succeeded + + + # Read a specific wiki note after listing note IDs + + abc12 + + + Update an existing note. diff --git a/tests/llm/test_source_aware_whitebox.py b/tests/llm/test_source_aware_whitebox.py index 2b22ae6..c43a5c4 100644 --- a/tests/llm/test_source_aware_whitebox.py +++ b/tests/llm/test_source_aware_whitebox.py @@ -19,6 +19,7 @@ def test_whitebox_prompt_loads_source_aware_coordination_skill(monkeypatch) -> N whitebox_llm = LLM(LLMConfig(scan_mode="quick", is_whitebox=True), agent_name="StrixAgent") assert "" in whitebox_llm.system_prompt + assert "" in whitebox_llm.system_prompt assert "Begin with fast source triage" in whitebox_llm.system_prompt assert "You MUST begin at the very first step by running the code and testing live." not in ( whitebox_llm.system_prompt @@ -26,3 +27,4 @@ def test_whitebox_prompt_loads_source_aware_coordination_skill(monkeypatch) -> N non_whitebox_llm = LLM(LLMConfig(scan_mode="quick", is_whitebox=False), agent_name="StrixAgent") assert "" not in non_whitebox_llm.system_prompt + assert "" not in non_whitebox_llm.system_prompt diff --git a/tests/tools/test_agents_graph_whitebox.py b/tests/tools/test_agents_graph_whitebox.py index ac98163..a8205cb 100644 --- a/tests/tools/test_agents_graph_whitebox.py +++ b/tests/tools/test_agents_graph_whitebox.py @@ -58,6 +58,9 @@ def test_create_agent_inherits_parent_whitebox_flag(monkeypatch) -> None: assert llm_config.timeout == 123 assert llm_config.scan_mode == "standard" assert llm_config.is_whitebox is True + child_task = captured_config["agent_config"]["state"].task + assert "White-box execution guidance (recommended when source is available):" in child_task + assert "mandatory" not in child_task.lower() def test_delegation_prompt_includes_wiki_memory_instruction_in_whitebox(monkeypatch) -> None: @@ -99,9 +102,197 @@ def test_delegation_prompt_includes_wiki_memory_instruction_in_whitebox(monkeypa state = FakeState() agent = FakeAgent() + agents_graph_actions._agent_instances[child_id] = agent result = agents_graph_actions._run_agent_in_thread(agent, state, inherited_messages=[]) assert result["result"] == {"ok": True} task_messages = [msg for role, msg in state.messages if role == "user"] assert task_messages assert 'list_notes(category="wiki")' in task_messages[-1] + assert "get_note(note_id=...)" in task_messages[-1] + assert "Before agent_finish" in task_messages[-1] + + +def test_agent_finish_appends_wiki_update_for_whitebox(monkeypatch) -> None: + monkeypatch.setenv("STRIX_LLM", "openai/gpt-5") + + agents_graph_actions._agent_graph["nodes"].clear() + agents_graph_actions._agent_graph["edges"].clear() + agents_graph_actions._agent_messages.clear() + agents_graph_actions._running_agents.clear() + agents_graph_actions._agent_instances.clear() + agents_graph_actions._agent_states.clear() + + parent_id = "parent-2" + child_id = "child-2" + agents_graph_actions._agent_graph["nodes"][parent_id] = { + "name": "Parent", + "task": "parent task", + "status": "running", + "parent_id": None, + } + agents_graph_actions._agent_graph["nodes"][child_id] = { + "name": "Child", + "task": "child task", + "status": "running", + "parent_id": parent_id, + } + agents_graph_actions._agent_instances[child_id] = SimpleNamespace( + llm_config=LLMConfig(is_whitebox=True) + ) + + captured: dict[str, str] = {} + + def fake_list_notes(category=None): + assert category == "wiki" + return { + "success": True, + "notes": [{"note_id": "wiki-note-1", "content": "Existing wiki content"}], + "total_count": 1, + } + + captured_get: dict[str, str] = {} + + def fake_get_note(note_id: str): + captured_get["note_id"] = note_id + return { + "success": True, + "note": { + "note_id": note_id, + "title": "Repo Wiki", + "content": "Existing wiki content", + }, + } + + def fake_update_note(note_id: str, content: str): + captured["note_id"] = note_id + captured["content"] = content + return {"success": True, "note_id": note_id} + + monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes) + monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note) + monkeypatch.setattr("strix.tools.notes.notes_actions.update_note", fake_update_note) + + state = SimpleNamespace(agent_id=child_id, parent_id=parent_id) + result = agents_graph_actions.agent_finish( + agent_state=state, + result_summary="AST pass completed", + findings=["Found route sink candidate"], + success=True, + final_recommendations=["Validate sink with dynamic PoC"], + ) + + assert result["agent_completed"] is True + assert captured_get["note_id"] == "wiki-note-1" + assert captured["note_id"] == "wiki-note-1" + assert "Agent Update: Child" in captured["content"] + assert "AST pass completed" in captured["content"] + + +def test_run_agent_in_thread_injects_shared_wiki_context_in_whitebox(monkeypatch) -> None: + monkeypatch.setenv("STRIX_LLM", "openai/gpt-5") + + agents_graph_actions._agent_graph["nodes"].clear() + agents_graph_actions._agent_graph["edges"].clear() + agents_graph_actions._agent_messages.clear() + agents_graph_actions._running_agents.clear() + agents_graph_actions._agent_instances.clear() + agents_graph_actions._agent_states.clear() + + parent_id = "parent-3" + child_id = "child-3" + agents_graph_actions._agent_graph["nodes"][parent_id] = {"name": "Parent", "status": "running"} + agents_graph_actions._agent_graph["nodes"][child_id] = {"name": "Child", "status": "running"} + + class FakeState: + def __init__(self) -> None: + self.agent_id = child_id + self.agent_name = "Child" + self.parent_id = parent_id + self.task = "map source" + self.stop_requested = False + self.messages: list[tuple[str, str]] = [] + + def add_message(self, role: str, content: str) -> None: + self.messages.append((role, content)) + + def model_dump(self) -> dict[str, str]: + return {"agent_id": self.agent_id} + + class FakeAgent: + def __init__(self) -> None: + self.llm_config = LLMConfig(is_whitebox=True) + + async def agent_loop(self, _task: str) -> dict[str, bool]: + return {"ok": True} + + captured_get: dict[str, str] = {} + + def fake_list_notes(category=None): + assert category == "wiki" + return { + "success": True, + "notes": [{"note_id": "wiki-ctx-1"}], + "total_count": 1, + } + + def fake_get_note(note_id: str): + captured_get["note_id"] = note_id + return { + "success": True, + "note": { + "note_id": note_id, + "title": "Shared Repo Wiki", + "content": "Architecture: server/client split", + }, + } + + monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes) + monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note) + + state = FakeState() + agent = FakeAgent() + agents_graph_actions._agent_instances[child_id] = agent + result = agents_graph_actions._run_agent_in_thread(agent, state, inherited_messages=[]) + + assert result["result"] == {"ok": True} + assert captured_get["note_id"] == "wiki-ctx-1" + user_messages = [content for role, content in state.messages if role == "user"] + assert user_messages + assert " None: + selected_note_ids: list[str] = [] + + def fake_list_notes(category=None): + assert category == "wiki" + return { + "success": True, + "notes": [ + {"note_id": "wiki-other", "tags": ["repo:other"]}, + {"note_id": "wiki-target", "tags": ["repo:appsmith"]}, + ], + "total_count": 2, + } + + def fake_get_note(note_id: str): + selected_note_ids.append(note_id) + return { + "success": True, + "note": {"note_id": note_id, "title": "Repo Wiki", "content": "content"}, + } + + monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes) + monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note) + + agent_state = SimpleNamespace( + task="analyze /workspace/appsmith", + context={"whitebox_repo_tags": ["repo:appsmith"]}, + ) + note = agents_graph_actions._load_primary_wiki_note(agent_state) + + assert note is not None + assert note["note_id"] == "wiki-target" + assert selected_note_ids == ["wiki-target"] diff --git a/tests/tools/test_notes_wiki.py b/tests/tools/test_notes_wiki.py index 06725c1..381c064 100644 --- a/tests/tools/test_notes_wiki.py +++ b/tests/tools/test_notes_wiki.py @@ -78,6 +78,8 @@ def test_notes_jsonl_replay_survives_memory_reset(tmp_path: Path, monkeypatch) - assert listed["success"] is True assert listed["total_count"] == 1 assert listed["notes"][0]["note_id"] == note_id + assert "content" not in listed["notes"][0] + assert "content_preview" in listed["notes"][0] updated = notes_actions.update_note(note_id=note_id, content="updated finding") assert updated["success"] is True @@ -87,6 +89,15 @@ def test_notes_jsonl_replay_survives_memory_reset(tmp_path: Path, monkeypatch) - assert listed_after_update["success"] is True assert listed_after_update["total_count"] == 1 assert listed_after_update["notes"][0]["note_id"] == note_id + assert listed_after_update["notes"][0]["content_preview"] == "updated finding" + + listed_with_content = notes_actions.list_notes( + category="findings", + include_content=True, + ) + assert listed_with_content["success"] is True + assert listed_with_content["total_count"] == 1 + assert listed_with_content["notes"][0]["content"] == "updated finding" deleted = notes_actions.delete_note(note_id=note_id) assert deleted["success"] is True @@ -98,3 +109,31 @@ def test_notes_jsonl_replay_survives_memory_reset(tmp_path: Path, monkeypatch) - finally: _reset_notes_state() set_global_tracer(previous_tracer) # type: ignore[arg-type] + + +def test_get_note_returns_full_note(tmp_path: Path, monkeypatch) -> None: + monkeypatch.chdir(tmp_path) + _reset_notes_state() + + previous_tracer = get_global_tracer() + tracer = Tracer("get-note-run") + set_global_tracer(tracer) + + try: + created = notes_actions.create_note( + title="Repo wiki", + content="entrypoints and sinks", + category="wiki", + tags=["repo:appsmith"], + ) + assert created["success"] is True + note_id = created["note_id"] + assert isinstance(note_id, str) + + result = notes_actions.get_note(note_id=note_id) + assert result["success"] is True + assert result["note"]["note_id"] == note_id + assert result["note"]["content"] == "entrypoints and sinks" + finally: + _reset_notes_state() + set_global_tracer(previous_tracer) # type: ignore[arg-type]