diff --git a/README.md b/README.md
index 8f5997c..aaf40d0 100644
--- a/README.md
+++ b/README.md
@@ -168,11 +168,17 @@ strix --target https://your-app.com --instruction "Perform authenticated testing
# Multi-target testing (source code + deployed app)
strix -t https://github.com/org/app -t https://your-app.com
+# White-box source-aware scan (local repository)
+strix --target ./app-directory --scan-mode standard
+
# Focused testing with custom instructions
strix --target api.your-app.com --instruction "Focus on business logic flaws and IDOR vulnerabilities"
# Provide detailed instructions through file (e.g., rules of engagement, scope, exclusions)
strix --target api.your-app.com --instruction-file ./instruction.md
+
+# Force PR diff-scope against a specific base branch
+strix -n --target ./ --scan-mode quick --scope-mode diff --diff-base origin/main
```
### Headless Mode
@@ -198,6 +204,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
+ with:
+ fetch-depth: 0
- name: Install Strix
run: curl -sSL https://strix.ai/install | bash
@@ -210,6 +218,11 @@ jobs:
run: strix -n -t ./ --scan-mode quick
```
+> [!TIP]
+> In CI pull request runs, Strix automatically scopes quick reviews to changed files.
+> If diff-scope cannot resolve, ensure checkout uses full history (`fetch-depth: 0`) or pass
+> `--diff-base` explicitly.
+
### Configuration
```bash
diff --git a/containers/Dockerfile b/containers/Dockerfile
index 9c88e29..6eb114a 100644
--- a/containers/Dockerfile
+++ b/containers/Dockerfile
@@ -97,7 +97,36 @@ RUN mkdir -p /home/pentester/.npm-global
RUN npm install -g retire@latest && \
npm install -g eslint@latest && \
- npm install -g js-beautify@latest
+ npm install -g js-beautify@latest && \
+ npm install -g @ast-grep/cli@latest && \
+ npm install -g tree-sitter-cli@latest
+
+RUN set -eux; \
+ TS_PARSER_DIR="/home/pentester/.tree-sitter/parsers"; \
+ mkdir -p "${TS_PARSER_DIR}"; \
+ for repo in tree-sitter-java tree-sitter-javascript tree-sitter-python tree-sitter-go tree-sitter-bash tree-sitter-json tree-sitter-yaml tree-sitter-typescript; do \
+ if [ "$repo" = "tree-sitter-yaml" ]; then \
+ repo_url="https://github.com/tree-sitter-grammars/${repo}.git"; \
+ else \
+ repo_url="https://github.com/tree-sitter/${repo}.git"; \
+ fi; \
+ if [ ! -d "${TS_PARSER_DIR}/${repo}" ]; then \
+ git clone --depth 1 "${repo_url}" "${TS_PARSER_DIR}/${repo}"; \
+ fi; \
+ done; \
+ if [ -d "${TS_PARSER_DIR}/tree-sitter-typescript/typescript" ]; then \
+ ln -sfn "${TS_PARSER_DIR}/tree-sitter-typescript/typescript" "${TS_PARSER_DIR}/tree-sitter-typescript-typescript"; \
+ fi; \
+ if [ -d "${TS_PARSER_DIR}/tree-sitter-typescript/tsx" ]; then \
+ ln -sfn "${TS_PARSER_DIR}/tree-sitter-typescript/tsx" "${TS_PARSER_DIR}/tree-sitter-typescript-tsx"; \
+ fi; \
+ tree-sitter init-config >/dev/null 2>&1 || true; \
+ TS_CONFIG="/home/pentester/.config/tree-sitter/config.json"; \
+ mkdir -p "$(dirname "${TS_CONFIG}")"; \
+ [ -f "${TS_CONFIG}" ] || printf '{}\n' > "${TS_CONFIG}"; \
+ TMP_CFG="$(mktemp)"; \
+ jq --arg p "${TS_PARSER_DIR}" '.["parser-directories"] = ((.["parser-directories"] // []) + [$p] | unique)' "${TS_CONFIG}" > "${TMP_CFG}"; \
+ mv "${TMP_CFG}" "${TS_CONFIG}"
WORKDIR /home/pentester/tools
RUN git clone https://github.com/aravind0x7/JS-Snooper.git && \
@@ -110,6 +139,18 @@ RUN git clone https://github.com/aravind0x7/JS-Snooper.git && \
USER root
RUN curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin
+RUN set -eux; \
+ ARCH="$(uname -m)"; \
+ case "$ARCH" in \
+ x86_64) GITLEAKS_ARCH="x64" ;; \
+ aarch64|arm64) GITLEAKS_ARCH="arm64" ;; \
+ *) echo "Unsupported architecture: $ARCH" >&2; exit 1 ;; \
+ esac; \
+ TAG="$(curl -fsSL https://api.github.com/repos/gitleaks/gitleaks/releases/latest | jq -r .tag_name)"; \
+ curl -fsSL "https://github.com/gitleaks/gitleaks/releases/download/${TAG}/gitleaks_${TAG#v}_linux_${GITLEAKS_ARCH}.tar.gz" -o /tmp/gitleaks.tgz; \
+ tar -xzf /tmp/gitleaks.tgz -C /tmp; \
+ install -m 0755 /tmp/gitleaks /usr/local/bin/gitleaks; \
+ rm -f /tmp/gitleaks /tmp/gitleaks.tgz
RUN apt-get update && apt-get install -y zaproxy
diff --git a/docs/integrations/ci-cd.mdx b/docs/integrations/ci-cd.mdx
index 48213e7..f55ebc1 100644
--- a/docs/integrations/ci-cd.mdx
+++ b/docs/integrations/ci-cd.mdx
@@ -13,6 +13,12 @@ Use the `-n` or `--non-interactive` flag:
strix -n --target ./app --scan-mode quick
```
+For pull-request style CI runs, Strix automatically scopes quick scans to changed files. You can force this behavior and set a base ref explicitly:
+
+```bash
+strix -n --target ./app --scan-mode quick --scope-mode diff --diff-base origin/main
+```
+
## Exit Codes
| Code | Meaning |
@@ -78,3 +84,7 @@ jobs:
All CI platforms require Docker access. Ensure your runner has Docker available.
+
+
+If diff-scope fails in CI, fetch full git history (for example, `fetch-depth: 0` in GitHub Actions) so merge-base and branch comparison can be resolved.
+
diff --git a/docs/integrations/github-actions.mdx b/docs/integrations/github-actions.mdx
index 6399144..5952c3a 100644
--- a/docs/integrations/github-actions.mdx
+++ b/docs/integrations/github-actions.mdx
@@ -18,6 +18,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
+ with:
+ fetch-depth: 0
- name: Install Strix
run: curl -sSL https://strix.ai/install | bash
@@ -58,3 +60,7 @@ The workflow fails when vulnerabilities are found:
Use `quick` mode for PRs to keep feedback fast. Schedule `deep` scans nightly.
+
+
+For pull_request workflows, Strix automatically uses changed-files diff-scope in CI/headless runs. If diff resolution fails, ensure full history is fetched (`fetch-depth: 0`) or set `--diff-base`.
+
diff --git a/docs/tools/sandbox.mdx b/docs/tools/sandbox.mdx
index c9043b9..386452f 100644
--- a/docs/tools/sandbox.mdx
+++ b/docs/tools/sandbox.mdx
@@ -45,13 +45,21 @@ Strix runs inside a Kali Linux-based Docker container with a comprehensive set o
| [js-beautify](https://github.com/beautifier/js-beautify) | JavaScript deobfuscation |
| [JSHint](https://jshint.com) | JavaScript code quality tool |
+## Source-Aware Analysis
+
+| Tool | Description |
+| ------------------------------------------------------- | --------------------------------------------- |
+| [Semgrep](https://github.com/semgrep/semgrep) | Fast SAST and custom rule matching |
+| [ast-grep](https://ast-grep.github.io) | Structural AST/CST-aware code search (`sg`) |
+| [Tree-sitter](https://tree-sitter.github.io/tree-sitter/) | Syntax tree parsing and symbol extraction (Java/JS/TS/Python/Go/Bash/JSON/YAML grammars pre-configured) |
+| [Bandit](https://bandit.readthedocs.io) | Python security linter |
+
## Secret Detection
| Tool | Description |
| ----------------------------------------------------------- | ------------------------------------- |
| [TruffleHog](https://github.com/trufflesecurity/trufflehog) | Find secrets in code and history |
-| [Semgrep](https://github.com/semgrep/semgrep) | Static analysis for security patterns |
-| [Bandit](https://bandit.readthedocs.io) | Python security linter |
+| [Gitleaks](https://github.com/gitleaks/gitleaks) | Detect hardcoded secrets in repositories |
## Authentication Testing
@@ -64,7 +72,7 @@ Strix runs inside a Kali Linux-based Docker container with a comprehensive set o
| Tool | Description |
| -------------------------- | ---------------------------------------------- |
-| [Trivy](https://trivy.dev) | Container and dependency vulnerability scanner |
+| [Trivy](https://trivy.dev) | Filesystem/container scanning for vulns, misconfigurations, secrets, and licenses |
## HTTP Proxy
diff --git a/docs/tools/terminal.mdx b/docs/tools/terminal.mdx
index 4b28bc7..5f7cb84 100644
--- a/docs/tools/terminal.mdx
+++ b/docs/tools/terminal.mdx
@@ -32,14 +32,18 @@ sqlmap -u "https://example.com/page?id=1"
### Code Analysis
```bash
-# Search for secrets
-trufflehog filesystem ./
-
-# Static analysis
+# Fast SAST triage
semgrep --config auto ./src
-# Grep for patterns
-grep -r "password" ./
+# Structural AST search
+sg scan ./src
+
+# Secret detection
+gitleaks detect --source ./
+trufflehog filesystem ./
+
+# Supply-chain and misconfiguration checks
+trivy fs ./
```
### Custom Scripts
diff --git a/docs/usage/cli.mdx b/docs/usage/cli.mdx
index bfb4e15..bb32009 100644
--- a/docs/usage/cli.mdx
+++ b/docs/usage/cli.mdx
@@ -27,6 +27,14 @@ strix --target [options]
Scan depth: `quick`, `standard`, or `deep`.
+
+ Code scope mode: `auto` (enable PR diff-scope in CI/headless runs), `diff` (force changed-files scope), or `full` (disable diff-scope).
+
+
+
+ Target branch or commit to compare against (e.g., `origin/main`). Defaults to the repository's default branch.
+
+
Run in headless mode without TUI. Ideal for CI/CD.
@@ -50,6 +58,9 @@ strix --target api.example.com --instruction "Focus on IDOR and auth bypass"
# CI/CD mode
strix -n --target ./ --scan-mode quick
+# Force diff-scope against a specific base ref
+strix -n --target ./ --scan-mode quick --scope-mode diff --diff-base origin/main
+
# Multi-target white-box testing
strix -t https://github.com/org/app -t https://staging.example.com
```
diff --git a/docs/usage/scan-modes.mdx b/docs/usage/scan-modes.mdx
index 73ed84d..9f95891 100644
--- a/docs/usage/scan-modes.mdx
+++ b/docs/usage/scan-modes.mdx
@@ -31,6 +31,8 @@ Balanced testing for routine security reviews. Best for:
**Duration**: 30 minutes to 1 hour
+**White-box behavior**: Uses source-aware mapping and static triage to prioritize dynamic exploit validation paths.
+
## Deep
```bash
@@ -44,6 +46,8 @@ Thorough penetration testing. Best for:
**Duration**: 1-4 hours depending on target complexity
+**White-box behavior**: Runs broad source-aware triage (`semgrep`, AST structural search, secrets, supply-chain checks) and then systematically validates top candidates dynamically.
+
Deep mode is the default. It explores edge cases, chained vulnerabilities, and complex attack paths.
diff --git a/strix/agents/StrixAgent/strix_agent.py b/strix/agents/StrixAgent/strix_agent.py
index c8177b0..36e3594 100644
--- a/strix/agents/StrixAgent/strix_agent.py
+++ b/strix/agents/StrixAgent/strix_agent.py
@@ -59,6 +59,7 @@ class StrixAgent(BaseAgent):
async def execute_scan(self, scan_config: dict[str, Any]) -> dict[str, Any]: # noqa: PLR0912
user_instructions = scan_config.get("user_instructions", "")
targets = scan_config.get("targets", [])
+ diff_scope = scan_config.get("diff_scope", {}) or {}
self.llm.set_system_prompt_context(self._build_system_scope_context(scan_config))
repositories = []
@@ -120,6 +121,28 @@ class StrixAgent(BaseAgent):
task_parts.append("\n\nIP Addresses:")
task_parts.extend(f"- {ip}" for ip in ip_addresses)
+ if diff_scope.get("active"):
+ task_parts.append("\n\nScope Constraints:")
+ task_parts.append(
+ "- Pull request diff-scope mode is active. Prioritize changed files "
+ "and use other files only for context."
+ )
+ for repo_scope in diff_scope.get("repos", []):
+ repo_label = (
+ repo_scope.get("workspace_subdir")
+ or repo_scope.get("source_path")
+ or "repository"
+ )
+ changed_count = repo_scope.get("analyzable_files_count", 0)
+ deleted_count = repo_scope.get("deleted_files_count", 0)
+ task_parts.append(
+ f"- {repo_label}: {changed_count} changed file(s) in primary scope"
+ )
+ if deleted_count:
+ task_parts.append(
+ f"- {repo_label}: {deleted_count} deleted file(s) are context-only"
+ )
+
task_description = " ".join(task_parts)
if user_instructions:
diff --git a/strix/agents/StrixAgent/system_prompt.jinja b/strix/agents/StrixAgent/system_prompt.jinja
index 2dd1466..e916b6d 100644
--- a/strix/agents/StrixAgent/system_prompt.jinja
+++ b/strix/agents/StrixAgent/system_prompt.jinja
@@ -111,12 +111,18 @@ BLACK-BOX TESTING (domain/subdomain only):
WHITE-BOX TESTING (code provided):
- MUST perform BOTH static AND dynamic analysis
-- Static: Review code for vulnerabilities
-- Dynamic: Run the application and test live
-- NEVER rely solely on static code analysis - always test dynamically
-- You MUST begin at the very first step by running the code and testing live.
+- Static: Use source-aware triage first to map risk quickly (`semgrep`, `ast-grep`, Tree-sitter tooling, `gitleaks`, `trufflehog`, `trivy fs`). Then review code for vulnerabilities
+- Static coverage floor: execute at least one structural AST mapping pass (`sg` and/or Tree-sitter) per repository and keep artifact output
+- Static coverage target per repository: run one `semgrep` pass, one secrets pass (`gitleaks` and/or `trufflehog`), one `trivy fs` pass, and one AST-structural pass (`sg` and/or Tree-sitter); if any are skipped, record why in the shared wiki
+- Keep AST artifacts bounded and high-signal: scope to relevant paths/hypotheses, avoid whole-repo generic function dumps
+- AST target selection rule: build `sg-targets.txt` from `semgrep.json` scope first (`paths.scanned`, fallback to unique `results[].path`), then run `xargs ... sg run` against that file list. Only use path-heuristic fallback if semgrep scope is unavailable, and log fallback reason in the wiki.
+- Shared memory: Use notes as shared working memory; discover wiki notes with `list_notes`, then read the selected one via `get_note(note_id=...)` before analysis
+- Before `agent_finish`/`finish_scan`, update the shared repo wiki with scanner summaries, key routes/sinks, and dynamic follow-up plan
+- Dynamic: Run the application and test live to validate exploitability
+- NEVER rely solely on static code analysis when dynamic validation is possible
+- Begin with fast source triage and dynamic run preparation in parallel; use static findings to prioritize live testing.
- Local execution, unit/integration testing, patch verification, and HTTP requests against locally started in-scope services are normal authorized white-box validation
-- If dynamically running the code proves impossible after exhaustive attempts, pivot to just comprehensive static analysis.
+- If dynamically running the code proves impossible after exhaustive attempts, pivot to comprehensive static analysis.
- Try to infer how to run the code based on its structure and content.
- FIX discovered vulnerabilities in code in same file.
- Test patches to confirm vulnerability removal.
@@ -460,8 +466,12 @@ JAVASCRIPT ANALYSIS:
CODE ANALYSIS:
- semgrep - Static analysis/SAST
+- ast-grep (sg) - Structural AST/CST-aware code search
+- tree-sitter - Syntax-aware parsing and symbol extraction support
- bandit - Python security linter
- trufflehog - Secret detection in code
+- gitleaks - Secret detection in repository content/history
+- trivy fs - Filesystem vulnerability/misconfiguration/license/secret scanning
SPECIALIZED TOOLS:
- jwt_tool - JWT token manipulation
diff --git a/strix/interface/cli.py b/strix/interface/cli.py
index 430eebc..ec853b3 100644
--- a/strix/interface/cli.py
+++ b/strix/interface/cli.py
@@ -72,9 +72,13 @@ async def run_cli(args: Any) -> None: # noqa: PLR0915
"targets": args.targets_info,
"user_instructions": args.instruction or "",
"run_name": args.run_name,
+ "diff_scope": getattr(args, "diff_scope", {"active": False}),
}
- llm_config = LLMConfig(scan_mode=scan_mode)
+ llm_config = LLMConfig(
+ scan_mode=scan_mode,
+ is_whitebox=bool(getattr(args, "local_sources", [])),
+ )
agent_config = {
"llm_config": llm_config,
"max_iterations": 300,
diff --git a/strix/interface/main.py b/strix/interface/main.py
index 56873f1..7ac6f13 100644
--- a/strix/interface/main.py
+++ b/strix/interface/main.py
@@ -36,6 +36,7 @@ from strix.interface.utils import ( # noqa: E402
image_exists,
infer_target_type,
process_pull_line,
+ resolve_diff_scope_context,
rewrite_localhost_targets,
validate_config_file,
validate_llm_response,
@@ -357,6 +358,28 @@ Examples:
),
)
+ parser.add_argument(
+ "--scope-mode",
+ type=str,
+ choices=["auto", "diff", "full"],
+ default="auto",
+ help=(
+ "Scope mode for code targets: "
+ "'auto' enables PR diff-scope in CI/headless runs, "
+ "'diff' forces changed-files scope, "
+ "'full' disables diff-scope."
+ ),
+ )
+
+ parser.add_argument(
+ "--diff-base",
+ type=str,
+ help=(
+ "Target branch or commit to compare against (e.g., origin/main). "
+ "Defaults to the repository's default branch."
+ ),
+ )
+
parser.add_argument(
"--config",
type=str,
@@ -514,7 +537,7 @@ def persist_config() -> None:
save_current_config()
-def main() -> None:
+def main() -> None: # noqa: PLR0912, PLR0915
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
@@ -541,6 +564,38 @@ def main() -> None:
target_info["details"]["cloned_repo_path"] = cloned_path
args.local_sources = collect_local_sources(args.targets_info)
+ try:
+ diff_scope = resolve_diff_scope_context(
+ local_sources=args.local_sources,
+ scope_mode=args.scope_mode,
+ diff_base=args.diff_base,
+ non_interactive=args.non_interactive,
+ )
+ except ValueError as e:
+ console = Console()
+ error_text = Text()
+ error_text.append("DIFF SCOPE RESOLUTION FAILED", style="bold red")
+ error_text.append("\n\n", style="white")
+ error_text.append(str(e), style="white")
+
+ panel = Panel(
+ error_text,
+ title="[bold white]STRIX",
+ title_align="left",
+ border_style="red",
+ padding=(1, 2),
+ )
+ console.print("\n")
+ console.print(panel)
+ console.print()
+ sys.exit(1)
+
+ args.diff_scope = diff_scope.metadata
+ if diff_scope.instruction_block:
+ if args.instruction:
+ args.instruction = f"{diff_scope.instruction_block}\n\n{args.instruction}"
+ else:
+ args.instruction = diff_scope.instruction_block
is_whitebox = bool(args.local_sources)
diff --git a/strix/interface/tool_components/notes_renderer.py b/strix/interface/tool_components/notes_renderer.py
index f4fc1a3..4a410c2 100644
--- a/strix/interface/tool_components/notes_renderer.py
+++ b/strix/interface/tool_components/notes_renderer.py
@@ -117,6 +117,8 @@ class ListNotesRenderer(BaseToolRenderer):
title = note.get("title", "").strip() or "(untitled)"
category = note.get("category", "general")
note_content = note.get("content", "").strip()
+ if not note_content:
+ note_content = note.get("content_preview", "").strip()
text.append("\n - ")
text.append(title)
@@ -131,3 +133,35 @@ class ListNotesRenderer(BaseToolRenderer):
css_classes = cls.get_css_classes("completed")
return Static(text, classes=css_classes)
+
+
+@register_tool_renderer
+class GetNoteRenderer(BaseToolRenderer):
+ tool_name: ClassVar[str] = "get_note"
+ css_classes: ClassVar[list[str]] = ["tool-call", "notes-tool"]
+
+ @classmethod
+ def render(cls, tool_data: dict[str, Any]) -> Static:
+ result = tool_data.get("result")
+
+ text = Text()
+ text.append("◇ ", style="#fbbf24")
+ text.append("note read", style="dim")
+
+ if result and isinstance(result, dict) and result.get("success"):
+ note = result.get("note", {}) or {}
+ title = str(note.get("title", "")).strip() or "(untitled)"
+ category = note.get("category", "general")
+ content = str(note.get("content", "")).strip()
+ text.append("\n ")
+ text.append(title)
+ text.append(f" ({category})", style="dim")
+ if content:
+ text.append("\n ")
+ text.append(content, style="dim")
+ else:
+ text.append("\n ")
+ text.append("Loading...", style="dim")
+
+ css_classes = cls.get_css_classes("completed")
+ return Static(text, classes=css_classes)
diff --git a/strix/interface/tui.py b/strix/interface/tui.py
index f366562..2b35680 100644
--- a/strix/interface/tui.py
+++ b/strix/interface/tui.py
@@ -742,11 +742,16 @@ class StrixTUIApp(App): # type: ignore[misc]
"targets": args.targets_info,
"user_instructions": args.instruction or "",
"run_name": args.run_name,
+ "diff_scope": getattr(args, "diff_scope", {"active": False}),
}
def _build_agent_config(self, args: argparse.Namespace) -> dict[str, Any]:
scan_mode = getattr(args, "scan_mode", "deep")
- llm_config = LLMConfig(scan_mode=scan_mode, interactive=True)
+ llm_config = LLMConfig(
+ scan_mode=scan_mode,
+ interactive=True,
+ is_whitebox=bool(getattr(args, "local_sources", [])),
+ )
config = {
"llm_config": llm_config,
diff --git a/strix/interface/utils.py b/strix/interface/utils.py
index 5b9e52b..3559fa9 100644
--- a/strix/interface/utils.py
+++ b/strix/interface/utils.py
@@ -1,11 +1,13 @@
import ipaddress
import json
+import os
import re
import secrets
import shutil
import subprocess
import sys
import tempfile
+from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
@@ -461,6 +463,612 @@ def generate_run_name(targets_info: list[dict[str, Any]] | None = None) -> str:
# Target processing utilities
+_SUPPORTED_SCOPE_MODES = {"auto", "diff", "full"}
+_MAX_FILES_PER_SECTION = 120
+
+
+@dataclass
+class DiffEntry:
+ status: str
+ path: str
+ old_path: str | None = None
+ similarity: int | None = None
+
+
+@dataclass
+class RepoDiffScope:
+ source_path: str
+ workspace_subdir: str | None
+ base_ref: str
+ merge_base: str
+ added_files: list[str]
+ modified_files: list[str]
+ renamed_files: list[dict[str, Any]]
+ deleted_files: list[str]
+ analyzable_files: list[str]
+ truncated_sections: dict[str, bool] = field(default_factory=dict)
+
+ def to_metadata(self) -> dict[str, Any]:
+ return {
+ "source_path": self.source_path,
+ "workspace_subdir": self.workspace_subdir,
+ "base_ref": self.base_ref,
+ "merge_base": self.merge_base,
+ "added_files": self.added_files,
+ "modified_files": self.modified_files,
+ "renamed_files": self.renamed_files,
+ "deleted_files": self.deleted_files,
+ "analyzable_files": self.analyzable_files,
+ "added_files_count": len(self.added_files),
+ "modified_files_count": len(self.modified_files),
+ "renamed_files_count": len(self.renamed_files),
+ "deleted_files_count": len(self.deleted_files),
+ "analyzable_files_count": len(self.analyzable_files),
+ "truncated_sections": self.truncated_sections,
+ }
+
+
+@dataclass
+class DiffScopeResult:
+ active: bool
+ mode: str
+ instruction_block: str = ""
+ metadata: dict[str, Any] = field(default_factory=dict)
+
+
+def _run_git_command(
+ repo_path: Path, args: list[str], check: bool = True
+) -> subprocess.CompletedProcess[str]:
+ return subprocess.run( # noqa: S603
+ ["git", "-C", str(repo_path), *args], # noqa: S607
+ capture_output=True,
+ text=True,
+ check=check,
+ )
+
+
+def _run_git_command_raw(
+ repo_path: Path, args: list[str], check: bool = True
+) -> subprocess.CompletedProcess[bytes]:
+ return subprocess.run( # noqa: S603
+ ["git", "-C", str(repo_path), *args], # noqa: S607
+ capture_output=True,
+ check=check,
+ )
+
+
+def _is_ci_environment(env: dict[str, str]) -> bool:
+ return any(
+ env.get(key)
+ for key in (
+ "CI",
+ "GITHUB_ACTIONS",
+ "GITLAB_CI",
+ "JENKINS_URL",
+ "BUILDKITE",
+ "CIRCLECI",
+ )
+ )
+
+
+def _is_pr_environment(env: dict[str, str]) -> bool:
+ return any(
+ env.get(key)
+ for key in (
+ "GITHUB_BASE_REF",
+ "GITHUB_HEAD_REF",
+ "CI_MERGE_REQUEST_TARGET_BRANCH_NAME",
+ "GITLAB_MERGE_REQUEST_TARGET_BRANCH_NAME",
+ "SYSTEM_PULLREQUEST_TARGETBRANCH",
+ )
+ )
+
+
+def _is_git_repo(repo_path: Path) -> bool:
+ result = _run_git_command(repo_path, ["rev-parse", "--is-inside-work-tree"], check=False)
+ return result.returncode == 0 and result.stdout.strip().lower() == "true"
+
+
+def _is_repo_shallow(repo_path: Path) -> bool:
+ result = _run_git_command(repo_path, ["rev-parse", "--is-shallow-repository"], check=False)
+ if result.returncode == 0:
+ value = result.stdout.strip().lower()
+ if value in {"true", "false"}:
+ return value == "true"
+
+ git_meta = repo_path / ".git"
+ if git_meta.is_dir():
+ return (git_meta / "shallow").exists()
+ if git_meta.is_file():
+ try:
+ content = git_meta.read_text(encoding="utf-8").strip()
+ except OSError:
+ return False
+ if content.startswith("gitdir:"):
+ git_dir = content.split(":", 1)[1].strip()
+ resolved = (repo_path / git_dir).resolve()
+ return (resolved / "shallow").exists()
+ return False
+
+
+def _git_ref_exists(repo_path: Path, ref: str) -> bool:
+ result = _run_git_command(repo_path, ["rev-parse", "--verify", "--quiet", ref], check=False)
+ return result.returncode == 0
+
+
+def _resolve_origin_head_ref(repo_path: Path) -> str | None:
+ result = _run_git_command(
+ repo_path, ["symbolic-ref", "--quiet", "refs/remotes/origin/HEAD"], check=False
+ )
+ if result.returncode != 0:
+ return None
+ ref = result.stdout.strip()
+ return ref or None
+
+
+def _extract_branch_name(ref: str | None) -> str | None:
+ if not ref:
+ return None
+ value = ref.strip()
+ if not value:
+ return None
+ return value.split("/")[-1]
+
+
+def _extract_github_base_sha(env: dict[str, str]) -> str | None:
+ event_path = env.get("GITHUB_EVENT_PATH", "").strip()
+ if not event_path:
+ return None
+
+ path = Path(event_path)
+ if not path.exists():
+ return None
+
+ try:
+ payload = json.loads(path.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ return None
+
+ base_sha = payload.get("pull_request", {}).get("base", {}).get("sha")
+ if isinstance(base_sha, str) and base_sha.strip():
+ return base_sha.strip()
+ return None
+
+
+def _resolve_default_branch_name(repo_path: Path, env: dict[str, str]) -> str | None:
+ github_base_ref = env.get("GITHUB_BASE_REF", "").strip()
+ if github_base_ref:
+ return github_base_ref
+
+ origin_head = _resolve_origin_head_ref(repo_path)
+ if origin_head:
+ branch = _extract_branch_name(origin_head)
+ if branch:
+ return branch
+
+ if _git_ref_exists(repo_path, "refs/remotes/origin/main"):
+ return "main"
+ if _git_ref_exists(repo_path, "refs/remotes/origin/master"):
+ return "master"
+
+ return None
+
+
+def _resolve_base_ref(repo_path: Path, diff_base: str | None, env: dict[str, str]) -> str:
+ if diff_base and diff_base.strip():
+ return diff_base.strip()
+
+ github_base_ref = env.get("GITHUB_BASE_REF", "").strip()
+ if github_base_ref:
+ github_candidate = f"refs/remotes/origin/{github_base_ref}"
+ if _git_ref_exists(repo_path, github_candidate):
+ return github_candidate
+
+ github_base_sha = _extract_github_base_sha(env)
+ if github_base_sha and _git_ref_exists(repo_path, github_base_sha):
+ return github_base_sha
+
+ origin_head = _resolve_origin_head_ref(repo_path)
+ if origin_head and _git_ref_exists(repo_path, origin_head):
+ return origin_head
+
+ if _git_ref_exists(repo_path, "refs/remotes/origin/main"):
+ return "refs/remotes/origin/main"
+
+ if _git_ref_exists(repo_path, "refs/remotes/origin/master"):
+ return "refs/remotes/origin/master"
+
+ raise ValueError(
+ "Unable to resolve a base ref for diff-scope. Pass --diff-base explicitly "
+ "(for example: --diff-base origin/main)."
+ )
+
+
+def _get_current_branch_name(repo_path: Path) -> str | None:
+ result = _run_git_command(repo_path, ["rev-parse", "--abbrev-ref", "HEAD"], check=False)
+ if result.returncode != 0:
+ return None
+ branch_name = result.stdout.strip()
+ if not branch_name or branch_name == "HEAD":
+ return None
+ return branch_name
+
+
+def _parse_name_status_z(raw_output: bytes) -> list[DiffEntry]:
+ if not raw_output:
+ return []
+
+ tokens = [
+ token.decode("utf-8", errors="replace") for token in raw_output.split(b"\x00") if token
+ ]
+ entries: list[DiffEntry] = []
+ index = 0
+
+ while index < len(tokens):
+ token = tokens[index]
+ status_raw = token
+ status_code = status_raw[:1]
+ similarity: int | None = None
+ if len(status_raw) > 1 and status_raw[1:].isdigit():
+ similarity = int(status_raw[1:])
+
+ # Git's -z output for --name-status is:
+ # - non-rename/copy: \0\0
+ # - rename/copy: \0\0\0
+ if status_code in {"R", "C"} and index + 2 < len(tokens):
+ old_path = tokens[index + 1]
+ new_path = tokens[index + 2]
+ entries.append(
+ DiffEntry(
+ status=status_code,
+ path=new_path,
+ old_path=old_path,
+ similarity=similarity,
+ )
+ )
+ index += 3
+ continue
+
+ if index + 1 < len(tokens):
+ path = tokens[index + 1]
+ entries.append(DiffEntry(status=status_code, path=path, similarity=similarity))
+ index += 2
+ continue
+
+ # Backward-compat fallback if output is tab-delimited unexpectedly.
+ status_fallback, has_tab, first_path = token.partition("\t")
+ if not has_tab:
+ break
+ fallback_code = status_fallback[:1]
+ fallback_similarity: int | None = None
+ if len(status_fallback) > 1 and status_fallback[1:].isdigit():
+ fallback_similarity = int(status_fallback[1:])
+ entries.append(
+ DiffEntry(status=fallback_code, path=first_path, similarity=fallback_similarity)
+ )
+ index += 1
+
+ return entries
+
+
+def _append_unique(container: list[str], seen: set[str], path: str) -> None:
+ if path and path not in seen:
+ seen.add(path)
+ container.append(path)
+
+
+def _classify_diff_entries(entries: list[DiffEntry]) -> dict[str, Any]:
+ added_files: list[str] = []
+ modified_files: list[str] = []
+ deleted_files: list[str] = []
+ renamed_files: list[dict[str, Any]] = []
+ analyzable_files: list[str] = []
+ analyzable_seen: set[str] = set()
+ modified_seen: set[str] = set()
+
+ for entry in entries:
+ path = entry.path
+ if not path:
+ continue
+
+ if entry.status == "D":
+ deleted_files.append(path)
+ continue
+
+ if entry.status == "A":
+ added_files.append(path)
+ _append_unique(analyzable_files, analyzable_seen, path)
+ continue
+
+ if entry.status == "M":
+ _append_unique(modified_files, modified_seen, path)
+ _append_unique(analyzable_files, analyzable_seen, path)
+ continue
+
+ if entry.status == "R":
+ renamed_files.append(
+ {
+ "old_path": entry.old_path,
+ "new_path": path,
+ "similarity": entry.similarity,
+ }
+ )
+ _append_unique(analyzable_files, analyzable_seen, path)
+ if entry.similarity is None or entry.similarity < 100:
+ _append_unique(modified_files, modified_seen, path)
+ continue
+
+ if entry.status == "C":
+ _append_unique(modified_files, modified_seen, path)
+ _append_unique(analyzable_files, analyzable_seen, path)
+ continue
+
+ _append_unique(modified_files, modified_seen, path)
+ _append_unique(analyzable_files, analyzable_seen, path)
+
+ return {
+ "added_files": added_files,
+ "modified_files": modified_files,
+ "deleted_files": deleted_files,
+ "renamed_files": renamed_files,
+ "analyzable_files": analyzable_files,
+ }
+
+
+def _truncate_file_list(
+ files: list[str], max_files: int = _MAX_FILES_PER_SECTION
+) -> tuple[list[str], bool]:
+ if len(files) <= max_files:
+ return files, False
+ return files[:max_files], True
+
+
+def build_diff_scope_instruction(scopes: list[RepoDiffScope]) -> str: # noqa: PLR0912
+ lines = [
+ "The user is requesting a review of a Pull Request.",
+ "Instruction: Direct your analysis primarily at the changes in the listed files. "
+ "You may reference other files in the repository for context (imports, definitions, "
+ "usage), but report findings only if they relate to the listed changes.",
+ "For Added files, review the entire file content.",
+ "For Modified files, focus primarily on the changed areas.",
+ ]
+
+ for scope in scopes:
+ repo_name = scope.workspace_subdir or Path(scope.source_path).name or "repository"
+ lines.append("")
+ lines.append(f"Repository Scope: {repo_name}")
+ lines.append(f"Base reference: {scope.base_ref}")
+ lines.append(f"Merge base: {scope.merge_base}")
+
+ focus_files, focus_truncated = _truncate_file_list(scope.analyzable_files)
+ scope.truncated_sections["analyzable_files"] = focus_truncated
+ if focus_files:
+ lines.append("Primary Focus (changed files to analyze):")
+ lines.extend(f"- {path}" for path in focus_files)
+ if focus_truncated:
+ lines.append(f"- ... ({len(scope.analyzable_files) - len(focus_files)} more files)")
+ else:
+ lines.append("Primary Focus: No analyzable changed files detected.")
+
+ added_files, added_truncated = _truncate_file_list(scope.added_files)
+ scope.truncated_sections["added_files"] = added_truncated
+ if added_files:
+ lines.append("Added files (review entire file):")
+ lines.extend(f"- {path}" for path in added_files)
+ if added_truncated:
+ lines.append(f"- ... ({len(scope.added_files) - len(added_files)} more files)")
+
+ modified_files, modified_truncated = _truncate_file_list(scope.modified_files)
+ scope.truncated_sections["modified_files"] = modified_truncated
+ if modified_files:
+ lines.append("Modified files (focus on changes):")
+ lines.extend(f"- {path}" for path in modified_files)
+ if modified_truncated:
+ lines.append(
+ f"- ... ({len(scope.modified_files) - len(modified_files)} more files)"
+ )
+
+ if scope.renamed_files:
+ rename_lines = []
+ for rename in scope.renamed_files:
+ old_path = rename.get("old_path") or "unknown"
+ new_path = rename.get("new_path") or "unknown"
+ similarity = rename.get("similarity")
+ if isinstance(similarity, int):
+ rename_lines.append(f"- {old_path} -> {new_path} (similarity {similarity}%)")
+ else:
+ rename_lines.append(f"- {old_path} -> {new_path}")
+ lines.append("Renamed files:")
+ lines.extend(rename_lines)
+
+ deleted_files, deleted_truncated = _truncate_file_list(scope.deleted_files)
+ scope.truncated_sections["deleted_files"] = deleted_truncated
+ if deleted_files:
+ lines.append("Note: These files were deleted (context only, not analyzable):")
+ lines.extend(f"- {path}" for path in deleted_files)
+ if deleted_truncated:
+ lines.append(f"- ... ({len(scope.deleted_files) - len(deleted_files)} more files)")
+
+ return "\n".join(lines).strip()
+
+
+def _should_activate_auto_scope(
+ local_sources: list[dict[str, str]], non_interactive: bool, env: dict[str, str]
+) -> bool:
+ if not local_sources:
+ return False
+ if not non_interactive:
+ return False
+ if not _is_ci_environment(env):
+ return False
+ if _is_pr_environment(env):
+ return True
+
+ for source in local_sources:
+ source_path = source.get("source_path")
+ if not source_path:
+ continue
+ repo_path = Path(source_path)
+ if not _is_git_repo(repo_path):
+ continue
+ current_branch = _get_current_branch_name(repo_path)
+ default_branch = _resolve_default_branch_name(repo_path, env)
+ if current_branch and default_branch and current_branch != default_branch:
+ return True
+ return False
+
+
+def _resolve_repo_diff_scope(
+ source: dict[str, str], diff_base: str | None, env: dict[str, str]
+) -> RepoDiffScope:
+ source_path = source.get("source_path", "")
+ workspace_subdir = source.get("workspace_subdir")
+ repo_path = Path(source_path)
+
+ if not _is_git_repo(repo_path):
+ raise ValueError(f"Source is not a git repository: {source_path}")
+
+ if _is_repo_shallow(repo_path):
+ raise ValueError(
+ "Strix requires full git history for diff-scope. Please set fetch-depth: 0 "
+ "in your CI config."
+ )
+
+ base_ref = _resolve_base_ref(repo_path, diff_base, env)
+ merge_base_result = _run_git_command(repo_path, ["merge-base", base_ref, "HEAD"], check=False)
+ if merge_base_result.returncode != 0:
+ stderr = merge_base_result.stderr.strip()
+ raise ValueError(
+ f"Unable to compute merge-base against '{base_ref}' for '{source_path}'. "
+ f"{stderr or 'Ensure the base branch history is fetched and reachable.'}"
+ )
+
+ merge_base = merge_base_result.stdout.strip()
+ if not merge_base:
+ raise ValueError(
+ f"Unable to compute merge-base against '{base_ref}' for '{source_path}'. "
+ "Ensure the base branch history is fetched and reachable."
+ )
+
+ diff_result = _run_git_command_raw(
+ repo_path,
+ [
+ "diff",
+ "--name-status",
+ "-z",
+ "--find-renames",
+ "--find-copies",
+ f"{merge_base}...HEAD",
+ ],
+ check=False,
+ )
+ if diff_result.returncode != 0:
+ stderr = diff_result.stderr.decode("utf-8", errors="replace").strip()
+ raise ValueError(
+ f"Unable to resolve changed files for '{source_path}'. "
+ f"{stderr or 'Ensure the repository has enough history for diff-scope.'}"
+ )
+
+ entries = _parse_name_status_z(diff_result.stdout)
+ classified = _classify_diff_entries(entries)
+
+ return RepoDiffScope(
+ source_path=source_path,
+ workspace_subdir=workspace_subdir,
+ base_ref=base_ref,
+ merge_base=merge_base,
+ added_files=classified["added_files"],
+ modified_files=classified["modified_files"],
+ renamed_files=classified["renamed_files"],
+ deleted_files=classified["deleted_files"],
+ analyzable_files=classified["analyzable_files"],
+ )
+
+
+def resolve_diff_scope_context(
+ local_sources: list[dict[str, str]],
+ scope_mode: str,
+ diff_base: str | None,
+ non_interactive: bool,
+ env: dict[str, str] | None = None,
+) -> DiffScopeResult:
+ if scope_mode not in _SUPPORTED_SCOPE_MODES:
+ raise ValueError(f"Unsupported scope mode: {scope_mode}")
+
+ env_map = dict(os.environ if env is None else env)
+
+ if scope_mode == "full":
+ return DiffScopeResult(
+ active=False,
+ mode=scope_mode,
+ metadata={"active": False, "mode": scope_mode},
+ )
+
+ if scope_mode == "auto":
+ should_activate = _should_activate_auto_scope(local_sources, non_interactive, env_map)
+ if not should_activate:
+ return DiffScopeResult(
+ active=False,
+ mode=scope_mode,
+ metadata={"active": False, "mode": scope_mode},
+ )
+
+ if not local_sources:
+ raise ValueError("Diff-scope is active, but no local repository targets were provided.")
+
+ repo_scopes: list[RepoDiffScope] = []
+ skipped_non_git: list[str] = []
+ skipped_diff_scope: list[str] = []
+ for source in local_sources:
+ source_path = source.get("source_path")
+ if not source_path:
+ continue
+ if not _is_git_repo(Path(source_path)):
+ skipped_non_git.append(source_path)
+ continue
+ try:
+ repo_scopes.append(_resolve_repo_diff_scope(source, diff_base, env_map))
+ except ValueError as e:
+ if scope_mode == "auto":
+ skipped_diff_scope.append(f"{source_path} (diff-scope skipped: {e})")
+ continue
+ raise
+
+ if not repo_scopes:
+ if scope_mode == "auto":
+ metadata: dict[str, Any] = {"active": False, "mode": scope_mode}
+ if skipped_non_git:
+ metadata["skipped_non_git_sources"] = skipped_non_git
+ if skipped_diff_scope:
+ metadata["skipped_diff_scope_sources"] = skipped_diff_scope
+ return DiffScopeResult(active=False, mode=scope_mode, metadata=metadata)
+
+ raise ValueError(
+ "Diff-scope is active, but no Git repositories were found. "
+ "Use --scope-mode full to disable diff-scope for this run."
+ )
+
+ instruction_block = build_diff_scope_instruction(repo_scopes)
+ metadata: dict[str, Any] = {
+ "active": True,
+ "mode": scope_mode,
+ "repos": [scope.to_metadata() for scope in repo_scopes],
+ "total_repositories": len(repo_scopes),
+ "total_analyzable_files": sum(len(scope.analyzable_files) for scope in repo_scopes),
+ "total_deleted_files": sum(len(scope.deleted_files) for scope in repo_scopes),
+ }
+ if skipped_non_git:
+ metadata["skipped_non_git_sources"] = skipped_non_git
+ if skipped_diff_scope:
+ metadata["skipped_diff_scope_sources"] = skipped_diff_scope
+
+ return DiffScopeResult(
+ active=True,
+ mode=scope_mode,
+ instruction_block=instruction_block,
+ metadata=metadata,
+ )
+
def _is_http_git_repo(url: str) -> bool:
check_url = f"{url.rstrip('/')}/info/refs?service=git-upload-pack"
diff --git a/strix/llm/config.py b/strix/llm/config.py
index 9c4757a..017c776 100644
--- a/strix/llm/config.py
+++ b/strix/llm/config.py
@@ -13,6 +13,7 @@ class LLMConfig:
skills: list[str] | None = None,
timeout: int | None = None,
scan_mode: str = "deep",
+ is_whitebox: bool = False,
interactive: bool = False,
reasoning_effort: str | None = None,
system_prompt_context: dict[str, Any] | None = None,
@@ -33,7 +34,7 @@ class LLMConfig:
self.timeout = timeout or int(Config.get("llm_timeout") or "300")
self.scan_mode = scan_mode if scan_mode in ["quick", "standard", "deep"] else "deep"
-
+ self.is_whitebox = is_whitebox
self.interactive = interactive
self.reasoning_effort = reasoning_effort
self.system_prompt_context = system_prompt_context or {}
diff --git a/strix/llm/llm.py b/strix/llm/llm.py
index c8827e3..4f62495 100644
--- a/strix/llm/llm.py
+++ b/strix/llm/llm.py
@@ -111,6 +111,9 @@ class LLM:
def _get_skills_to_load(self) -> list[str]:
ordered_skills = [*self._active_skills]
ordered_skills.append(f"scan_modes/{self.config.scan_mode}")
+ if self.config.is_whitebox:
+ ordered_skills.append("coordination/source_aware_whitebox")
+ ordered_skills.append("custom/source_aware_sast")
deduped: list[str] = []
seen: set[str] = set()
diff --git a/strix/skills/README.md b/strix/skills/README.md
index 1d4f71d..501c8b0 100644
--- a/strix/skills/README.md
+++ b/strix/skills/README.md
@@ -38,6 +38,10 @@ The skills are dynamically injected into the agent's system prompt, allowing it
| **`/reconnaissance`** | Advanced information gathering and enumeration techniques for comprehensive attack surface mapping |
| **`/custom`** | Community-contributed skills for specialized or industry-specific testing scenarios |
+Notable source-aware skills:
+- `source_aware_whitebox` (coordination): white-box orchestration playbook
+- `source_aware_sast` (custom): semgrep/AST/secrets/supply-chain static triage workflow
+
---
## 🎨 Creating New Skills
diff --git a/strix/skills/coordination/source_aware_whitebox.md b/strix/skills/coordination/source_aware_whitebox.md
new file mode 100644
index 0000000..58f0a8b
--- /dev/null
+++ b/strix/skills/coordination/source_aware_whitebox.md
@@ -0,0 +1,68 @@
+---
+name: source-aware-whitebox
+description: Coordination playbook for source-aware white-box testing with static triage and dynamic validation
+---
+
+# Source-Aware White-Box Coordination
+
+Use this coordination playbook when repository source code is available.
+
+## Objective
+
+Increase white-box coverage by combining source-aware triage with dynamic validation. Source-aware tooling is expected by default when source is available.
+
+## Recommended Workflow
+
+1. Build a quick source map before deep exploitation, including at least one AST-structural pass (`sg` or `tree-sitter`) scoped to relevant paths.
+ - For `sg` baseline, derive `sg-targets.txt` from `semgrep.json` scope first (`paths.scanned`, fallback to unique `results[].path`) and run `xargs ... sg run` on that list.
+ - Only fall back to path heuristics when semgrep scope is unavailable, and record the fallback reason in the repo wiki.
+2. Run first-pass static triage to rank high-risk paths.
+3. Use triage outputs to prioritize dynamic PoC validation.
+4. Keep findings evidence-driven: no report without validation.
+5. Keep shared wiki memory current so all agents can reuse context.
+
+## Source-Aware Triage Stack
+
+- `semgrep`: fast security-first triage and custom pattern scans
+- `ast-grep` (`sg`): structural pattern hunting and targeted repo mapping
+- `tree-sitter`: syntax-aware parsing support for symbol and route extraction
+- `gitleaks` + `trufflehog`: complementary secret detection (working tree and history coverage)
+- `trivy fs`: dependency, misconfiguration, license, and secret checks
+
+Coverage target per repository:
+- one `semgrep` pass
+- one AST structural pass (`sg` and/or `tree-sitter`)
+- one secrets pass (`gitleaks` and/or `trufflehog`)
+- one `trivy fs` pass
+- if any part is skipped, log the reason in the shared wiki note
+
+## Agent Delegation Guidance
+
+- Keep child agents specialized by vulnerability/component as usual.
+- For source-heavy subtasks, prefer creating child agents with `source_aware_sast` skill.
+- Use source findings to shape payloads and endpoint selection for dynamic testing.
+
+## Wiki Note Requirement (Source Map)
+
+When source is present, maintain one wiki note per repository and keep it current.
+
+Operational rules:
+- At task start, call `list_notes` with `category=wiki`, then read the selected wiki with `get_note(note_id=...)`.
+- If no repo wiki exists, create one with `create_note` and `category=wiki`.
+- Update the same wiki via `update_note`; avoid creating duplicate wiki notes for the same repo.
+- Child agents should read wiki notes first via `get_note`, then extend with new evidence from their scope.
+- Before calling `agent_finish`, each source-focused child agent should append a short delta update to the shared repo wiki (scanner outputs, route/sink map deltas, dynamic follow-ups).
+
+Recommended sections:
+- Architecture overview
+- Entrypoints and routing
+- AuthN/AuthZ model
+- High-risk sinks and trust boundaries
+- Static scanner summary
+- Dynamic validation follow-ups
+
+## Validation Guardrails
+
+- Static findings are hypotheses until validated.
+- Dynamic exploitation evidence is still required before vulnerability reporting.
+- Keep scanner output concise, deduplicated, and mapped to concrete code locations.
diff --git a/strix/skills/custom/source_aware_sast.md b/strix/skills/custom/source_aware_sast.md
new file mode 100644
index 0000000..f829349
--- /dev/null
+++ b/strix/skills/custom/source_aware_sast.md
@@ -0,0 +1,167 @@
+---
+name: source-aware-sast
+description: Practical source-aware SAST and AST playbook for semgrep, ast-grep, gitleaks, and trivy fs
+---
+
+# Source-Aware SAST Playbook
+
+Use this skill for source-heavy analysis where static and structural signals should guide dynamic testing.
+
+## Fast Start
+
+Run tools from repo root and store outputs in a dedicated artifact directory:
+
+```bash
+mkdir -p /workspace/.strix-source-aware
+```
+
+Before scanning, check shared wiki memory:
+
+```text
+1) list_notes(category="wiki")
+2) get_note(note_id=...) for the selected repo wiki before analysis
+3) Reuse matching repo wiki note if present
+4) create_note(category="wiki") only if missing
+```
+
+After every major source-analysis batch, update the same repo wiki note with `update_note` so other agents can reuse your latest map.
+
+## Baseline Coverage Bundle (Recommended)
+
+Run this baseline once per repository before deep narrowing:
+
+```bash
+ART=/workspace/.strix-source-aware
+mkdir -p "$ART"
+
+semgrep scan --config p/default --config p/golang --config p/secrets \
+ --metrics=off --json --output "$ART/semgrep.json" .
+# Build deterministic AST targets from semgrep scope (no hardcoded path guessing)
+python3 - <<'PY'
+import json
+from pathlib import Path
+
+art = Path("/workspace/.strix-source-aware")
+semgrep_json = art / "semgrep.json"
+targets_file = art / "sg-targets.txt"
+
+try:
+ data = json.loads(semgrep_json.read_text(encoding="utf-8"))
+except Exception:
+ targets_file.write_text("", encoding="utf-8")
+ raise
+
+scanned = data.get("paths", {}).get("scanned") or []
+if not scanned:
+ scanned = sorted(
+ {
+ r.get("path")
+ for r in data.get("results", [])
+ if isinstance(r, dict) and isinstance(r.get("path"), str) and r.get("path")
+ }
+ )
+
+bounded = scanned[:4000]
+targets_file.write_text("".join(f"{p}\n" for p in bounded), encoding="utf-8")
+print(f"sg-targets: {len(bounded)}")
+PY
+xargs -r -n 200 sg run --pattern '$F($$$ARGS)' --json=stream < "$ART/sg-targets.txt" \
+ > "$ART/ast-grep.json" 2> "$ART/ast-grep.log" || true
+gitleaks detect --source . --report-format json --report-path "$ART/gitleaks.json" || true
+trufflehog filesystem --no-update --json --no-verification . > "$ART/trufflehog.json" || true
+# Keep trivy focused on vuln/misconfig (secrets already covered above) and increase timeout for large repos
+trivy fs --scanners vuln,misconfig --timeout 30m --offline-scan \
+ --format json --output "$ART/trivy-fs.json" . || true
+```
+
+If one tool is skipped or fails, record that in the shared wiki note along with the reason.
+
+## Semgrep First Pass
+
+Use Semgrep as the default static triage pass:
+
+```bash
+# Preferred deterministic profile set (works with --metrics=off)
+semgrep scan --config p/default --config p/golang --config p/secrets \
+ --metrics=off --json --output /workspace/.strix-source-aware/semgrep.json .
+
+# If you choose auto config, do not combine it with --metrics=off
+semgrep scan --config auto --json --output /workspace/.strix-source-aware/semgrep-auto.json .
+```
+
+If diff scope is active, restrict to changed files first, then expand only when needed.
+
+## AST-Grep Structural Mapping
+
+Use `sg` for structure-aware code hunting:
+
+```bash
+# Ruleless structural pass over deterministic target list (no sgconfig.yml required)
+xargs -r -n 200 sg run --pattern '$F($$$ARGS)' --json=stream \
+ < /workspace/.strix-source-aware/sg-targets.txt \
+ > /workspace/.strix-source-aware/ast-grep.json 2> /workspace/.strix-source-aware/ast-grep.log || true
+```
+
+Target high-value patterns such as:
+- missing auth checks near route handlers
+- dynamic command/query construction
+- unsafe deserialization or template execution paths
+- file and path operations influenced by user input
+
+## Tree-Sitter Assisted Repo Mapping
+
+Use tree-sitter CLI for syntax-aware parsing when grep-level mapping is noisy:
+
+```bash
+tree-sitter parse -q
+```
+
+Use outputs to improve route/symbol/sink maps for subsequent targeted scans.
+
+## Secret and Supply Chain Coverage
+
+Detect hardcoded credentials:
+
+```bash
+gitleaks detect --source . --report-format json --report-path /workspace/.strix-source-aware/gitleaks.json
+trufflehog filesystem --json . > /workspace/.strix-source-aware/trufflehog.json
+```
+
+Run repository-wide dependency and config checks:
+
+```bash
+trivy fs --scanners vuln,misconfig --timeout 30m --offline-scan \
+ --format json --output /workspace/.strix-source-aware/trivy-fs.json . || true
+```
+
+## Converting Static Signals Into Exploits
+
+1. Rank candidates by impact and exploitability.
+2. Trace source-to-sink flow for top candidates.
+3. Build dynamic PoCs that reproduce the suspected issue.
+4. Report only after dynamic validation succeeds.
+
+## Wiki Update Template
+
+Keep one wiki note per repository and update these sections:
+
+```text
+## Architecture
+## Entrypoints
+## AuthN/AuthZ
+## High-Risk Sinks
+## Static Findings Summary
+## Dynamic Validation Follow-Ups
+```
+
+Before `agent_finish`, make one final `update_note` call to capture:
+- scanner artifacts and paths
+- top validated/invalidated hypotheses
+- concrete dynamic follow-up tasks
+
+## Anti-Patterns
+
+- Do not treat scanner output as final truth.
+- Do not spend full cycles on low-signal pattern matches.
+- Do not report source-only findings without validation evidence.
+- Do not create multiple wiki notes for the same repository when one already exists.
diff --git a/strix/skills/scan_modes/deep.md b/strix/skills/scan_modes/deep.md
index 4235f11..a2687fe 100644
--- a/strix/skills/scan_modes/deep.md
+++ b/strix/skills/scan_modes/deep.md
@@ -15,6 +15,11 @@ Thorough understanding before exploitation. Test every parameter, every endpoint
**Whitebox (source available)**
- Map every file, module, and code path in the repository
+- Load and maintain shared `wiki` notes from the start (`list_notes(category="wiki")` then `get_note(note_id=...)`), then continuously update one repo note
+- Start with broad source-aware triage (`semgrep`, `ast-grep`, `gitleaks`, `trufflehog`, `trivy fs`) and use outputs to drive deep review
+- Execute at least one structural AST pass (`sg` and/or Tree-sitter) per repository and store artifacts for reuse
+- Keep AST artifacts bounded and query-driven (target relevant paths/sinks first; avoid whole-repo generic function dumps)
+- Use syntax-aware parsing (Tree-sitter tooling) to improve symbol, route, and sink extraction quality
- Trace all entry points from HTTP handlers to database queries
- Document all authentication mechanisms and implementations
- Map authorization checks and access control model
@@ -25,7 +30,8 @@ Thorough understanding before exploitation. Test every parameter, every endpoint
- Identify all serialization/deserialization points
- Review file handling: upload, download, processing
- Understand the deployment model and infrastructure assumptions
-- Check all dependency versions against CVE databases
+- Check all dependency versions and repository risks against CVE/misconfiguration data
+- Before final completion, update the shared repo wiki with scanner summary + dynamic follow-ups
**Blackbox (no source)**
- Exhaustive subdomain enumeration with multiple sources and tools
diff --git a/strix/skills/scan_modes/quick.md b/strix/skills/scan_modes/quick.md
index 506ffc4..7e8f36f 100644
--- a/strix/skills/scan_modes/quick.md
+++ b/strix/skills/scan_modes/quick.md
@@ -15,9 +15,15 @@ Optimize for fast feedback on critical security issues. Skip exhaustive enumerat
**Whitebox (source available)**
- Focus on recent changes: git diffs, new commits, modified files—these are most likely to contain fresh bugs
+- Read existing `wiki` notes first (`list_notes(category="wiki")` then `get_note(note_id=...)`) to avoid remapping from scratch
+- Run a fast static triage on changed files first (`semgrep`, then targeted `sg` queries)
+- Run at least one lightweight AST pass (`sg` or Tree-sitter) so structural mapping is not skipped
+- Keep AST commands tightly scoped to changed or high-risk paths; avoid broad repository-wide pattern dumps
+- Run quick secret and dependency checks (`gitleaks`, `trufflehog`, `trivy fs`) scoped to changed areas when possible
- Identify security-sensitive patterns in changed code: auth checks, input handling, database queries, file operations
- Trace user input through modified code paths
- Check if security controls were modified or bypassed
+- Before completion, update the shared repo wiki with what changed and what needs dynamic follow-up
**Blackbox (no source)**
- Map authentication and critical user flows
diff --git a/strix/skills/scan_modes/standard.md b/strix/skills/scan_modes/standard.md
index a13b786..13f3f70 100644
--- a/strix/skills/scan_modes/standard.md
+++ b/strix/skills/scan_modes/standard.md
@@ -15,12 +15,17 @@ Systematic testing across the full attack surface. Understand the application be
**Whitebox (source available)**
- Map codebase structure: modules, entry points, routing
+- Start by loading existing `wiki` notes (`list_notes(category="wiki")` then `get_note(note_id=...)`) and update one shared repo note as mapping evolves
+- Run `semgrep` first-pass triage to prioritize risky flows before deep manual review
+- Run at least one AST-structural mapping pass (`sg` and/or Tree-sitter), then use outputs for route, sink, and trust-boundary mapping
+- Keep AST output bounded to relevant paths and hypotheses; avoid whole-repo generic function dumps
- Identify architecture pattern (MVC, microservices, monolith)
- Trace input vectors: forms, APIs, file uploads, headers, cookies
- Review authentication and authorization flows
- Analyze database interactions and ORM usage
-- Check dependencies for known CVEs
+- Check dependencies and repo risks with `trivy fs`, `gitleaks`, and `trufflehog`
- Understand the data model and sensitive data locations
+- Before completion, update the shared repo wiki with source findings summary and dynamic validation next steps
**Blackbox (no source)**
- Crawl application thoroughly, interact with every feature
diff --git a/strix/tools/agents_graph/agents_graph_actions.py b/strix/tools/agents_graph/agents_graph_actions.py
index d4425b7..76313d7 100644
--- a/strix/tools/agents_graph/agents_graph_actions.py
+++ b/strix/tools/agents_graph/agents_graph_actions.py
@@ -1,5 +1,6 @@
import threading
from datetime import UTC, datetime
+import re
from typing import Any, Literal
from strix.tools.registry import register_tool
@@ -21,6 +22,140 @@ _agent_instances: dict[str, Any] = {}
_agent_states: dict[str, Any] = {}
+def _is_whitebox_agent(agent_id: str) -> bool:
+ agent = _agent_instances.get(agent_id)
+ return bool(getattr(getattr(agent, "llm_config", None), "is_whitebox", False))
+
+
+def _extract_repo_tags(agent_state: Any | None) -> set[str]:
+ repo_tags: set[str] = set()
+ if agent_state is None:
+ return repo_tags
+
+ task_text = str(getattr(agent_state, "task", "") or "")
+ for workspace_subdir in re.findall(r"/workspace/([A-Za-z0-9._-]+)", task_text):
+ repo_tags.add(f"repo:{workspace_subdir.lower()}")
+
+ for repo_name in re.findall(r"github\.com/[^/\s]+/([A-Za-z0-9._-]+)", task_text):
+ normalized = repo_name.removesuffix(".git").lower()
+ if normalized:
+ repo_tags.add(f"repo:{normalized}")
+
+ return repo_tags
+
+
+def _load_primary_wiki_note(agent_state: Any | None = None) -> dict[str, Any] | None:
+ try:
+ from strix.tools.notes.notes_actions import get_note, list_notes
+
+ notes_result = list_notes(category="wiki")
+ if not notes_result.get("success"):
+ return None
+
+ notes = notes_result.get("notes") or []
+ if not notes:
+ return None
+
+ selected_note_id = None
+ repo_tags = _extract_repo_tags(agent_state)
+ if repo_tags:
+ for note in notes:
+ note_tags = note.get("tags") or []
+ if not isinstance(note_tags, list):
+ continue
+ normalized_note_tags = {str(tag).strip().lower() for tag in note_tags if str(tag).strip()}
+ if normalized_note_tags.intersection(repo_tags):
+ selected_note_id = note.get("note_id")
+ break
+
+ note_id = selected_note_id or notes[0].get("note_id")
+ if not isinstance(note_id, str) or not note_id:
+ return None
+
+ note_result = get_note(note_id=note_id)
+ if not note_result.get("success"):
+ return None
+
+ note = note_result.get("note")
+ if not isinstance(note, dict):
+ return None
+
+ except Exception:
+ return None
+ else:
+ return note
+
+
+def _inject_wiki_context_for_whitebox(agent_state: Any) -> None:
+ if not _is_whitebox_agent(agent_state.agent_id):
+ return
+
+ wiki_note = _load_primary_wiki_note(agent_state)
+ if not wiki_note:
+ return
+
+ title = str(wiki_note.get("title") or "repo wiki")
+ content = str(wiki_note.get("content") or "").strip()
+ if not content:
+ return
+
+ max_chars = 4000
+ truncated_content = content[:max_chars]
+ suffix = "\n\n[truncated for context size]" if len(content) > max_chars else ""
+ agent_state.add_message(
+ "user",
+ (
+ f"\n"
+ f"{truncated_content}{suffix}\n"
+ ""
+ ),
+ )
+
+
+def _append_wiki_update_on_finish(
+ agent_state: Any,
+ agent_name: str,
+ result_summary: str,
+ findings: list[str] | None,
+ final_recommendations: list[str] | None,
+) -> None:
+ if not _is_whitebox_agent(agent_state.agent_id):
+ return
+
+ try:
+ from strix.tools.notes.notes_actions import append_note_content
+
+ note = _load_primary_wiki_note(agent_state)
+ if not note:
+ return
+
+ note_id = note.get("note_id")
+ if not isinstance(note_id, str) or not note_id:
+ return
+
+ timestamp = datetime.now(UTC).isoformat()
+ summary = " ".join(str(result_summary).split())
+ if len(summary) > 1200:
+ summary = f"{summary[:1197]}..."
+ findings_lines = "\n".join(f"- {item}" for item in (findings or [])) or "- none"
+ recommendation_lines = (
+ "\n".join(f"- {item}" for item in (final_recommendations or [])) or "- none"
+ )
+
+ delta = (
+ f"\n\n## Agent Update: {agent_name} ({timestamp})\n"
+ f"Summary: {summary}\n\n"
+ "Findings:\n"
+ f"{findings_lines}\n\n"
+ "Recommendations:\n"
+ f"{recommendation_lines}\n"
+ )
+ append_note_content(note_id=note_id, delta=delta)
+ except Exception:
+ # Best-effort update; never block agent completion on note persistence.
+ return
+
+
def _run_agent_in_thread(
agent: Any, state: Any, inherited_messages: list[dict[str, Any]]
) -> dict[str, Any]:
@@ -31,6 +166,8 @@ def _run_agent_in_thread(
state.add_message(msg["role"], msg["content"])
state.add_message("user", "")
+ _inject_wiki_context_for_whitebox(state)
+
parent_info = _agent_graph["nodes"].get(state.parent_id, {})
parent_name = parent_info.get("name", "Unknown Parent")
@@ -39,6 +176,18 @@ def _run_agent_in_thread(
if inherited_messages
else "started with a fresh context"
)
+ wiki_memory_instruction = ""
+ if getattr(getattr(agent, "llm_config", None), "is_whitebox", False):
+ wiki_memory_instruction = (
+ '\n - White-box memory (recommended): call list_notes(category="wiki") and then '
+ "get_note(note_id=...) before substantive work (including terminal scans)"
+ "\n - Reuse one repo wiki note where possible and avoid duplicates"
+ "\n - Before agent_finish, call list_notes(category=\"wiki\") + get_note(note_id=...) again, then append a short scope delta via update_note (new routes/sinks, scanner results, dynamic follow-ups)"
+ "\n - If terminal output contains `command not found` or shell parse errors, correct and rerun before using the result"
+ "\n - Use ASCII-only shell commands; if a command includes unexpected non-ASCII characters, rerun with a clean ASCII command"
+ "\n - Keep AST artifacts bounded: target relevant paths and avoid whole-repo generic function dumps"
+ "\n - Source-aware tooling is advisory: choose semgrep/AST/tree-sitter/gitleaks/trivy when relevant, do not force static steps for purely dynamic validation tasks"
+ )
task_xml = f"""
@@ -64,6 +213,7 @@ def _run_agent_in_thread(
- All agents share /workspace directory and proxy history for better collaboration
- You can see files created by other agents and proxy traffic from previous work
- Build upon previous work but focus on your specific delegated task
+{wiki_memory_instruction}
"""
@@ -214,14 +364,34 @@ def create_agent(
timeout = None
scan_mode = "deep"
+ is_whitebox = False
interactive = False
if parent_agent and hasattr(parent_agent, "llm_config"):
if hasattr(parent_agent.llm_config, "timeout"):
timeout = parent_agent.llm_config.timeout
if hasattr(parent_agent.llm_config, "scan_mode"):
scan_mode = parent_agent.llm_config.scan_mode
+ if hasattr(parent_agent.llm_config, "is_whitebox"):
+ is_whitebox = parent_agent.llm_config.is_whitebox
interactive = getattr(parent_agent.llm_config, "interactive", False)
+ if is_whitebox:
+ whitebox_guidance = (
+ "\n\nWhite-box execution guidance (recommended when source is available):\n"
+ "- Use structural AST mapping (`sg` or `tree-sitter`) where it helps source analysis; "
+ "keep artifacts bounded and skip forced AST steps for purely dynamic validation tasks.\n"
+ "- Keep AST output bounded: scope to relevant paths/files, avoid whole-repo "
+ "generic function patterns, and cap artifact size.\n"
+ '- Use shared wiki memory by calling list_notes(category="wiki") then '
+ "get_note(note_id=...).\n"
+ '- Before agent_finish, call list_notes(category="wiki") + get_note(note_id=...) '
+ "again, reuse one repo wiki, and call update_note.\n"
+ "- If terminal output contains `command not found` or shell parse errors, "
+ "correct and rerun before using the result."
+ )
+ if "White-box execution guidance (recommended when source is available):" not in task:
+ task = f"{task.rstrip()}{whitebox_guidance}"
+
state = AgentState(
task=task,
agent_name=name,
@@ -229,11 +399,11 @@ def create_agent(
max_iterations=300,
waiting_timeout=300 if interactive else 600,
)
-
llm_config = LLMConfig(
skills=skill_list,
timeout=timeout,
scan_mode=scan_mode,
+ is_whitebox=is_whitebox,
interactive=interactive,
)
@@ -382,6 +552,14 @@ def agent_finish(
"recommendations": final_recommendations or [],
}
+ _append_wiki_update_on_finish(
+ agent_state=agent_state,
+ agent_name=agent_node["name"],
+ result_summary=result_summary,
+ findings=findings,
+ final_recommendations=final_recommendations,
+ )
+
parent_notified = False
if report_to_parent and agent_node["parent_id"]:
diff --git a/strix/tools/notes/__init__.py b/strix/tools/notes/__init__.py
index ebcbbca..8d14123 100644
--- a/strix/tools/notes/__init__.py
+++ b/strix/tools/notes/__init__.py
@@ -1,6 +1,7 @@
from .notes_actions import (
create_note,
delete_note,
+ get_note,
list_notes,
update_note,
)
@@ -9,6 +10,7 @@ from .notes_actions import (
__all__ = [
"create_note",
"delete_note",
+ "get_note",
"list_notes",
"update_note",
]
diff --git a/strix/tools/notes/notes_actions.py b/strix/tools/notes/notes_actions.py
index daab233..450ff35 100644
--- a/strix/tools/notes/notes_actions.py
+++ b/strix/tools/notes/notes_actions.py
@@ -1,11 +1,179 @@
+import json
+import threading
import uuid
from datetime import UTC, datetime
+from pathlib import Path
from typing import Any
from strix.tools.registry import register_tool
_notes_storage: dict[str, dict[str, Any]] = {}
+_VALID_NOTE_CATEGORIES = ["general", "findings", "methodology", "questions", "plan", "wiki"]
+_notes_lock = threading.RLock()
+_loaded_notes_run_dir: str | None = None
+_DEFAULT_CONTENT_PREVIEW_CHARS = 280
+
+
+def _get_run_dir() -> Path | None:
+ try:
+ from strix.telemetry.tracer import get_global_tracer
+
+ tracer = get_global_tracer()
+ if not tracer:
+ return None
+ return tracer.get_run_dir()
+ except (ImportError, OSError, RuntimeError):
+ return None
+
+
+def _get_notes_jsonl_path() -> Path | None:
+ run_dir = _get_run_dir()
+ if not run_dir:
+ return None
+
+ notes_dir = run_dir / "notes"
+ notes_dir.mkdir(parents=True, exist_ok=True)
+ return notes_dir / "notes.jsonl"
+
+
+def _append_note_event(op: str, note_id: str, note: dict[str, Any] | None = None) -> None:
+ notes_path = _get_notes_jsonl_path()
+ if not notes_path:
+ return
+
+ event: dict[str, Any] = {
+ "timestamp": datetime.now(UTC).isoformat(),
+ "op": op,
+ "note_id": note_id,
+ }
+ if note is not None:
+ event["note"] = note
+
+ with notes_path.open("a", encoding="utf-8") as f:
+ f.write(f"{json.dumps(event, ensure_ascii=True)}\n")
+
+
+def _load_notes_from_jsonl(notes_path: Path) -> dict[str, dict[str, Any]]:
+ hydrated: dict[str, dict[str, Any]] = {}
+ if not notes_path.exists():
+ return hydrated
+
+ with notes_path.open(encoding="utf-8") as f:
+ for raw_line in f:
+ line = raw_line.strip()
+ if not line:
+ continue
+
+ try:
+ event = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+
+ op = str(event.get("op", "")).strip().lower()
+ note_id = str(event.get("note_id", "")).strip()
+ if not note_id or op not in {"create", "update", "delete"}:
+ continue
+
+ if op == "delete":
+ hydrated.pop(note_id, None)
+ continue
+
+ note = event.get("note")
+ if not isinstance(note, dict):
+ continue
+
+ existing = hydrated.get(note_id, {})
+ existing.update(note)
+ hydrated[note_id] = existing
+
+ return hydrated
+
+
+def _ensure_notes_loaded() -> None:
+ global _loaded_notes_run_dir # noqa: PLW0603
+
+ run_dir = _get_run_dir()
+ run_dir_key = str(run_dir.resolve()) if run_dir else "__no_run_dir__"
+ if _loaded_notes_run_dir == run_dir_key:
+ return
+
+ _notes_storage.clear()
+
+ notes_path = _get_notes_jsonl_path()
+ if notes_path:
+ _notes_storage.update(_load_notes_from_jsonl(notes_path))
+ try:
+ for note_id, note in _notes_storage.items():
+ if note.get("category") == "wiki":
+ _persist_wiki_note(note_id, note)
+ except OSError:
+ pass
+
+ _loaded_notes_run_dir = run_dir_key
+
+
+def _sanitize_wiki_title(title: str) -> str:
+ cleaned = "".join(ch.lower() if ch.isalnum() else "-" for ch in title.strip())
+ slug = "-".join(part for part in cleaned.split("-") if part)
+ return slug or "wiki-note"
+
+
+def _get_wiki_directory() -> Path | None:
+ try:
+ run_dir = _get_run_dir()
+ if not run_dir:
+ return None
+
+ wiki_dir = run_dir / "wiki"
+ wiki_dir.mkdir(parents=True, exist_ok=True)
+ except OSError:
+ return None
+ else:
+ return wiki_dir
+
+
+def _get_wiki_note_path(note_id: str, note: dict[str, Any]) -> Path | None:
+ wiki_dir = _get_wiki_directory()
+ if not wiki_dir:
+ return None
+
+ wiki_filename = note.get("wiki_filename")
+ if not isinstance(wiki_filename, str) or not wiki_filename.strip():
+ title = note.get("title", "wiki-note")
+ wiki_filename = f"{note_id}-{_sanitize_wiki_title(str(title))}.md"
+ note["wiki_filename"] = wiki_filename
+
+ return wiki_dir / wiki_filename
+
+
+def _persist_wiki_note(note_id: str, note: dict[str, Any]) -> None:
+ wiki_path = _get_wiki_note_path(note_id, note)
+ if not wiki_path:
+ return
+
+ tags = note.get("tags", [])
+ tags_line = ", ".join(str(tag) for tag in tags) if isinstance(tags, list) and tags else "none"
+
+ content = (
+ f"# {note.get('title', 'Wiki Note')}\n\n"
+ f"**Note ID:** {note_id}\n"
+ f"**Created:** {note.get('created_at', '')}\n"
+ f"**Updated:** {note.get('updated_at', '')}\n"
+ f"**Tags:** {tags_line}\n\n"
+ "## Content\n\n"
+ f"{note.get('content', '')}\n"
+ )
+ wiki_path.write_text(content, encoding="utf-8")
+
+
+def _remove_wiki_note(note_id: str, note: dict[str, Any]) -> None:
+ wiki_path = _get_wiki_note_path(note_id, note)
+ if not wiki_path:
+ return
+
+ if wiki_path.exists():
+ wiki_path.unlink()
def _filter_notes(
@@ -13,6 +181,7 @@ def _filter_notes(
tags: list[str] | None = None,
search_query: str | None = None,
) -> list[dict[str, Any]]:
+ _ensure_notes_loaded()
filtered_notes = []
for note_id, note in _notes_storage.items():
@@ -39,50 +208,99 @@ def _filter_notes(
return filtered_notes
+def _to_note_listing_entry(
+ note: dict[str, Any],
+ *,
+ include_content: bool = False,
+) -> dict[str, Any]:
+ entry = {
+ "note_id": note.get("note_id"),
+ "title": note.get("title", ""),
+ "category": note.get("category", "general"),
+ "tags": note.get("tags", []),
+ "created_at": note.get("created_at", ""),
+ "updated_at": note.get("updated_at", ""),
+ }
+
+ wiki_filename = note.get("wiki_filename")
+ if isinstance(wiki_filename, str) and wiki_filename:
+ entry["wiki_filename"] = wiki_filename
+
+ content = str(note.get("content", ""))
+ if include_content:
+ entry["content"] = content
+ elif content:
+ if len(content) > _DEFAULT_CONTENT_PREVIEW_CHARS:
+ entry["content_preview"] = (
+ f"{content[:_DEFAULT_CONTENT_PREVIEW_CHARS].rstrip()}..."
+ )
+ else:
+ entry["content_preview"] = content
+
+ return entry
+
+
@register_tool(sandbox_execution=False)
-def create_note(
+def create_note( # noqa: PLR0911
title: str,
content: str,
category: str = "general",
tags: list[str] | None = None,
) -> dict[str, Any]:
- try:
- if not title or not title.strip():
- return {"success": False, "error": "Title cannot be empty", "note_id": None}
+ with _notes_lock:
+ try:
+ _ensure_notes_loaded()
- if not content or not content.strip():
- return {"success": False, "error": "Content cannot be empty", "note_id": None}
+ if not title or not title.strip():
+ return {"success": False, "error": "Title cannot be empty", "note_id": None}
- valid_categories = ["general", "findings", "methodology", "questions", "plan"]
- if category not in valid_categories:
- return {
- "success": False,
- "error": f"Invalid category. Must be one of: {', '.join(valid_categories)}",
- "note_id": None,
+ if not content or not content.strip():
+ return {"success": False, "error": "Content cannot be empty", "note_id": None}
+
+ if category not in _VALID_NOTE_CATEGORIES:
+ return {
+ "success": False,
+ "error": (
+ f"Invalid category. Must be one of: {', '.join(_VALID_NOTE_CATEGORIES)}"
+ ),
+ "note_id": None,
+ }
+
+ note_id = ""
+ for _ in range(20):
+ candidate = str(uuid.uuid4())[:5]
+ if candidate not in _notes_storage:
+ note_id = candidate
+ break
+ if not note_id:
+ return {"success": False, "error": "Failed to allocate note ID", "note_id": None}
+
+ timestamp = datetime.now(UTC).isoformat()
+
+ note = {
+ "title": title.strip(),
+ "content": content.strip(),
+ "category": category,
+ "tags": tags or [],
+ "created_at": timestamp,
+ "updated_at": timestamp,
}
- note_id = str(uuid.uuid4())[:5]
- timestamp = datetime.now(UTC).isoformat()
+ _notes_storage[note_id] = note
+ _append_note_event("create", note_id, note)
+ if category == "wiki":
+ _persist_wiki_note(note_id, note)
- note = {
- "title": title.strip(),
- "content": content.strip(),
- "category": category,
- "tags": tags or [],
- "created_at": timestamp,
- "updated_at": timestamp,
- }
-
- _notes_storage[note_id] = note
-
- except (ValueError, TypeError) as e:
- return {"success": False, "error": f"Failed to create note: {e}", "note_id": None}
- else:
- return {
- "success": True,
- "note_id": note_id,
- "message": f"Note '{title}' created successfully",
- }
+ except (ValueError, TypeError) as e:
+ return {"success": False, "error": f"Failed to create note: {e}", "note_id": None}
+ except OSError as e:
+ return {"success": False, "error": f"Failed to persist wiki note: {e}", "note_id": None}
+ else:
+ return {
+ "success": True,
+ "note_id": note_id,
+ "message": f"Note '{title}' created successfully",
+ }
@register_tool(sandbox_execution=False)
@@ -90,23 +308,83 @@ def list_notes(
category: str | None = None,
tags: list[str] | None = None,
search: str | None = None,
+ include_content: bool = False,
) -> dict[str, Any]:
- try:
- filtered_notes = _filter_notes(category=category, tags=tags, search_query=search)
+ with _notes_lock:
+ try:
+ filtered_notes = _filter_notes(category=category, tags=tags, search_query=search)
+ notes = [
+ _to_note_listing_entry(note, include_content=include_content)
+ for note in filtered_notes
+ ]
- return {
- "success": True,
- "notes": filtered_notes,
- "total_count": len(filtered_notes),
- }
+ return {
+ "success": True,
+ "notes": notes,
+ "total_count": len(notes),
+ }
- except (ValueError, TypeError) as e:
- return {
- "success": False,
- "error": f"Failed to list notes: {e}",
- "notes": [],
- "total_count": 0,
- }
+ except (ValueError, TypeError) as e:
+ return {
+ "success": False,
+ "error": f"Failed to list notes: {e}",
+ "notes": [],
+ "total_count": 0,
+ }
+
+
+@register_tool(sandbox_execution=False)
+def get_note(note_id: str) -> dict[str, Any]:
+ with _notes_lock:
+ try:
+ _ensure_notes_loaded()
+
+ if not note_id or not note_id.strip():
+ return {
+ "success": False,
+ "error": "Note ID cannot be empty",
+ "note": None,
+ }
+
+ note = _notes_storage.get(note_id)
+ if note is None:
+ return {
+ "success": False,
+ "error": f"Note with ID '{note_id}' not found",
+ "note": None,
+ }
+
+ note_with_id = note.copy()
+ note_with_id["note_id"] = note_id
+
+ except (ValueError, TypeError) as e:
+ return {
+ "success": False,
+ "error": f"Failed to get note: {e}",
+ "note": None,
+ }
+ else:
+ return {"success": True, "note": note_with_id}
+
+
+def append_note_content(note_id: str, delta: str) -> dict[str, Any]:
+ with _notes_lock:
+ try:
+ _ensure_notes_loaded()
+
+ if note_id not in _notes_storage:
+ return {"success": False, "error": f"Note with ID '{note_id}' not found"}
+
+ if not isinstance(delta, str):
+ return {"success": False, "error": "Delta must be a string"}
+
+ note = _notes_storage[note_id]
+ existing_content = str(note.get("content") or "")
+ updated_content = f"{existing_content.rstrip()}{delta}"
+ return update_note(note_id=note_id, content=updated_content)
+
+ except (ValueError, TypeError) as e:
+ return {"success": False, "error": f"Failed to append note content: {e}"}
@register_tool(sandbox_execution=False)
@@ -116,49 +394,66 @@ def update_note(
content: str | None = None,
tags: list[str] | None = None,
) -> dict[str, Any]:
- try:
- if note_id not in _notes_storage:
- return {"success": False, "error": f"Note with ID '{note_id}' not found"}
+ with _notes_lock:
+ try:
+ _ensure_notes_loaded()
- note = _notes_storage[note_id]
+ if note_id not in _notes_storage:
+ return {"success": False, "error": f"Note with ID '{note_id}' not found"}
- if title is not None:
- if not title.strip():
- return {"success": False, "error": "Title cannot be empty"}
- note["title"] = title.strip()
+ note = _notes_storage[note_id]
- if content is not None:
- if not content.strip():
- return {"success": False, "error": "Content cannot be empty"}
- note["content"] = content.strip()
+ if title is not None:
+ if not title.strip():
+ return {"success": False, "error": "Title cannot be empty"}
+ note["title"] = title.strip()
- if tags is not None:
- note["tags"] = tags
+ if content is not None:
+ if not content.strip():
+ return {"success": False, "error": "Content cannot be empty"}
+ note["content"] = content.strip()
- note["updated_at"] = datetime.now(UTC).isoformat()
+ if tags is not None:
+ note["tags"] = tags
- return {
- "success": True,
- "message": f"Note '{note['title']}' updated successfully",
- }
+ note["updated_at"] = datetime.now(UTC).isoformat()
+ _append_note_event("update", note_id, note)
+ if note.get("category") == "wiki":
+ _persist_wiki_note(note_id, note)
- except (ValueError, TypeError) as e:
- return {"success": False, "error": f"Failed to update note: {e}"}
+ return {
+ "success": True,
+ "message": f"Note '{note['title']}' updated successfully",
+ }
+
+ except (ValueError, TypeError) as e:
+ return {"success": False, "error": f"Failed to update note: {e}"}
+ except OSError as e:
+ return {"success": False, "error": f"Failed to persist wiki note: {e}"}
@register_tool(sandbox_execution=False)
def delete_note(note_id: str) -> dict[str, Any]:
- try:
- if note_id not in _notes_storage:
- return {"success": False, "error": f"Note with ID '{note_id}' not found"}
+ with _notes_lock:
+ try:
+ _ensure_notes_loaded()
- note_title = _notes_storage[note_id]["title"]
- del _notes_storage[note_id]
+ if note_id not in _notes_storage:
+ return {"success": False, "error": f"Note with ID '{note_id}' not found"}
- except (ValueError, TypeError) as e:
- return {"success": False, "error": f"Failed to delete note: {e}"}
- else:
- return {
- "success": True,
- "message": f"Note '{note_title}' deleted successfully",
- }
+ note = _notes_storage[note_id]
+ note_title = note["title"]
+ if note.get("category") == "wiki":
+ _remove_wiki_note(note_id, note)
+ del _notes_storage[note_id]
+ _append_note_event("delete", note_id)
+
+ except (ValueError, TypeError) as e:
+ return {"success": False, "error": f"Failed to delete note: {e}"}
+ except OSError as e:
+ return {"success": False, "error": f"Failed to delete wiki note: {e}"}
+ else:
+ return {
+ "success": True,
+ "message": f"Note '{note_title}' deleted successfully",
+ }
diff --git a/strix/tools/notes/notes_actions_schema.xml b/strix/tools/notes/notes_actions_schema.xml
index f47c167..3b186a5 100644
--- a/strix/tools/notes/notes_actions_schema.xml
+++ b/strix/tools/notes/notes_actions_schema.xml
@@ -2,7 +2,8 @@
Create a personal note for observations, findings, and research during the scan.
Use this tool for documenting discoveries, observations, methodology notes, and questions.
- This is your personal notepad for recording information you want to remember or reference later.
+ This is your personal and shared run memory for recording information you want to remember or reference later.
+ Use category "wiki" for repository source maps shared across agents in the same run.
For tracking actionable tasks, use the todo tool instead.
@@ -12,7 +13,7 @@
Content of the note
- Category to organize the note (default: "general", "findings", "methodology", "questions", "plan")
+ Category to organize the note (default: "general", "findings", "methodology", "questions", "plan", "wiki")
Tags for categorization
@@ -92,7 +93,7 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
- List existing notes with optional filtering and search.
+ List existing notes with optional filtering and search (metadata-first by default).
Filter by category
@@ -103,9 +104,12 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
Search query to find in note titles and content
+
+ Include full note content in each list item (default: false)
+
- Response containing: - notes: List of matching notes - total_count: Total number of notes found
+ Response containing: - notes: List of matching notes (metadata + optional content/content_preview) - total_count: Total number of notes found
# List all findings
@@ -122,6 +126,28 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
admin
findings
+
+
+ # Load shared repository wiki notes
+
+ wiki
+
+
+
+
+ Get a single note by ID, including full content.
+
+
+ ID of the note to fetch
+
+
+
+ Response containing: - note: Note object including content - success: Whether note lookup succeeded
+
+
+ # Read a specific wiki note after listing note IDs
+
+ abc12
diff --git a/tests/interface/test_diff_scope.py b/tests/interface/test_diff_scope.py
new file mode 100644
index 0000000..9fe7dd6
--- /dev/null
+++ b/tests/interface/test_diff_scope.py
@@ -0,0 +1,153 @@
+import importlib.util
+from pathlib import Path
+
+import pytest
+
+
+def _load_utils_module():
+ module_path = Path(__file__).resolve().parents[2] / "strix" / "interface" / "utils.py"
+ spec = importlib.util.spec_from_file_location("strix_interface_utils_test", module_path)
+ if spec is None or spec.loader is None:
+ raise RuntimeError("Failed to load strix.interface.utils for tests")
+
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+utils = _load_utils_module()
+
+
+def test_parse_name_status_uses_rename_destination_path() -> None:
+ raw = (
+ b"R100\x00old/path.py\x00new/path.py\x00"
+ b"R75\x00legacy/module.py\x00modern/module.py\x00"
+ b"M\x00src/app.py\x00"
+ b"A\x00src/new_file.py\x00"
+ b"D\x00src/deleted.py\x00"
+ )
+
+ entries = utils._parse_name_status_z(raw)
+ classified = utils._classify_diff_entries(entries)
+
+ assert "new/path.py" in classified["analyzable_files"]
+ assert "old/path.py" not in classified["analyzable_files"]
+ assert "modern/module.py" in classified["analyzable_files"]
+ assert classified["renamed_files"][0]["old_path"] == "old/path.py"
+ assert classified["renamed_files"][0]["new_path"] == "new/path.py"
+ assert "src/deleted.py" in classified["deleted_files"]
+ assert "src/deleted.py" not in classified["analyzable_files"]
+
+
+def test_build_diff_scope_instruction_includes_added_modified_and_deleted_guidance() -> None:
+ scope = utils.RepoDiffScope(
+ source_path="/tmp/repo",
+ workspace_subdir="repo",
+ base_ref="refs/remotes/origin/main",
+ merge_base="abc123",
+ added_files=["src/added.py"],
+ modified_files=["src/changed.py"],
+ renamed_files=[{"old_path": "src/old.py", "new_path": "src/new.py", "similarity": 90}],
+ deleted_files=["src/deleted.py"],
+ analyzable_files=["src/added.py", "src/changed.py", "src/new.py"],
+ )
+
+ instruction = utils.build_diff_scope_instruction([scope])
+
+ assert "For Added files, review the entire file content." in instruction
+ assert "For Modified files, focus primarily on the changed areas." in instruction
+ assert "Note: These files were deleted" in instruction
+ assert "src/deleted.py" in instruction
+ assert "src/old.py -> src/new.py" in instruction
+
+
+def test_resolve_base_ref_prefers_github_base_ref(monkeypatch) -> None:
+ calls: list[str] = []
+
+ def fake_ref_exists(_repo_path: Path, ref: str) -> bool:
+ calls.append(ref)
+ return ref == "refs/remotes/origin/release-2026"
+
+ monkeypatch.setattr(utils, "_git_ref_exists", fake_ref_exists)
+ monkeypatch.setattr(utils, "_extract_github_base_sha", lambda _env: None)
+ monkeypatch.setattr(utils, "_resolve_origin_head_ref", lambda _repo_path: None)
+
+ base_ref = utils._resolve_base_ref(
+ Path("/tmp/repo"),
+ diff_base=None,
+ env={"GITHUB_BASE_REF": "release-2026"},
+ )
+
+ assert base_ref == "refs/remotes/origin/release-2026"
+ assert calls[0] == "refs/remotes/origin/release-2026"
+
+
+def test_resolve_base_ref_falls_back_to_remote_main(monkeypatch) -> None:
+ calls: list[str] = []
+
+ def fake_ref_exists(_repo_path: Path, ref: str) -> bool:
+ calls.append(ref)
+ return ref == "refs/remotes/origin/main"
+
+ monkeypatch.setattr(utils, "_git_ref_exists", fake_ref_exists)
+ monkeypatch.setattr(utils, "_extract_github_base_sha", lambda _env: None)
+ monkeypatch.setattr(utils, "_resolve_origin_head_ref", lambda _repo_path: None)
+
+ base_ref = utils._resolve_base_ref(Path("/tmp/repo"), diff_base=None, env={})
+
+ assert base_ref == "refs/remotes/origin/main"
+ assert "refs/remotes/origin/main" in calls
+ assert "origin/main" not in calls
+
+
+def test_resolve_diff_scope_context_auto_degrades_when_repo_scope_resolution_fails(
+ monkeypatch,
+) -> None:
+ source = {"source_path": "/tmp/repo", "workspace_subdir": "repo"}
+
+ monkeypatch.setattr(utils, "_should_activate_auto_scope", lambda *_args, **_kwargs: True)
+ monkeypatch.setattr(utils, "_is_git_repo", lambda _repo_path: True)
+ monkeypatch.setattr(
+ utils,
+ "_resolve_repo_diff_scope",
+ lambda *_args, **_kwargs: (_ for _ in ()).throw(ValueError("shallow history")),
+ )
+
+ result = utils.resolve_diff_scope_context(
+ local_sources=[source],
+ scope_mode="auto",
+ diff_base=None,
+ non_interactive=True,
+ env={},
+ )
+
+ assert result.active is False
+ assert result.mode == "auto"
+ assert result.metadata["active"] is False
+ assert result.metadata["mode"] == "auto"
+ assert "skipped_diff_scope_sources" in result.metadata
+ assert result.metadata["skipped_diff_scope_sources"] == [
+ "/tmp/repo (diff-scope skipped: shallow history)"
+ ]
+
+
+def test_resolve_diff_scope_context_diff_mode_still_raises_on_repo_scope_resolution_failure(
+ monkeypatch,
+) -> None:
+ source = {"source_path": "/tmp/repo", "workspace_subdir": "repo"}
+
+ monkeypatch.setattr(utils, "_is_git_repo", lambda _repo_path: True)
+ monkeypatch.setattr(
+ utils,
+ "_resolve_repo_diff_scope",
+ lambda *_args, **_kwargs: (_ for _ in ()).throw(ValueError("shallow history")),
+ )
+
+ with pytest.raises(ValueError, match="shallow history"):
+ utils.resolve_diff_scope_context(
+ local_sources=[source],
+ scope_mode="diff",
+ diff_base=None,
+ non_interactive=True,
+ env={},
+ )
diff --git a/tests/llm/test_source_aware_whitebox.py b/tests/llm/test_source_aware_whitebox.py
new file mode 100644
index 0000000..c43a5c4
--- /dev/null
+++ b/tests/llm/test_source_aware_whitebox.py
@@ -0,0 +1,30 @@
+from strix.llm.config import LLMConfig
+from strix.llm.llm import LLM
+
+
+def test_llm_config_whitebox_defaults_to_false(monkeypatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
+ config = LLMConfig()
+ assert config.is_whitebox is False
+
+
+def test_llm_config_whitebox_can_be_enabled(monkeypatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
+ config = LLMConfig(is_whitebox=True)
+ assert config.is_whitebox is True
+
+
+def test_whitebox_prompt_loads_source_aware_coordination_skill(monkeypatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
+
+ whitebox_llm = LLM(LLMConfig(scan_mode="quick", is_whitebox=True), agent_name="StrixAgent")
+ assert "" in whitebox_llm.system_prompt
+ assert "" in whitebox_llm.system_prompt
+ assert "Begin with fast source triage" in whitebox_llm.system_prompt
+ assert "You MUST begin at the very first step by running the code and testing live." not in (
+ whitebox_llm.system_prompt
+ )
+
+ non_whitebox_llm = LLM(LLMConfig(scan_mode="quick", is_whitebox=False), agent_name="StrixAgent")
+ assert "" not in non_whitebox_llm.system_prompt
+ assert "" not in non_whitebox_llm.system_prompt
diff --git a/tests/tools/test_agents_graph_whitebox.py b/tests/tools/test_agents_graph_whitebox.py
new file mode 100644
index 0000000..de89332
--- /dev/null
+++ b/tests/tools/test_agents_graph_whitebox.py
@@ -0,0 +1,298 @@
+from types import SimpleNamespace
+
+import strix.agents as agents_module
+from strix.llm.config import LLMConfig
+from strix.tools.agents_graph import agents_graph_actions
+
+
+def test_create_agent_inherits_parent_whitebox_flag(monkeypatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
+
+ agents_graph_actions._agent_graph["nodes"].clear()
+ agents_graph_actions._agent_graph["edges"].clear()
+ agents_graph_actions._agent_messages.clear()
+ agents_graph_actions._running_agents.clear()
+ agents_graph_actions._agent_instances.clear()
+ agents_graph_actions._agent_states.clear()
+
+ parent_id = "parent-agent"
+ parent_llm = LLMConfig(timeout=123, scan_mode="standard", is_whitebox=True)
+ agents_graph_actions._agent_instances[parent_id] = SimpleNamespace(
+ llm_config=parent_llm,
+ non_interactive=True,
+ )
+
+ captured_config: dict[str, object] = {}
+
+ class FakeStrixAgent:
+ def __init__(self, config: dict[str, object]):
+ captured_config["agent_config"] = config
+
+ class FakeThread:
+ def __init__(self, target, args, daemon, name):
+ self.target = target
+ self.args = args
+ self.daemon = daemon
+ self.name = name
+
+ def start(self) -> None:
+ return None
+
+ monkeypatch.setattr(agents_module, "StrixAgent", FakeStrixAgent)
+ monkeypatch.setattr(agents_graph_actions.threading, "Thread", FakeThread)
+
+ agent_state = SimpleNamespace(
+ agent_id=parent_id,
+ get_conversation_history=list,
+ )
+ result = agents_graph_actions.create_agent(
+ agent_state=agent_state,
+ task="source-aware child task",
+ name="SourceAwareChild",
+ inherit_context=False,
+ )
+
+ assert result["success"] is True
+ llm_config = captured_config["agent_config"]["llm_config"]
+ assert isinstance(llm_config, LLMConfig)
+ assert llm_config.timeout == 123
+ assert llm_config.scan_mode == "standard"
+ assert llm_config.is_whitebox is True
+ child_task = captured_config["agent_config"]["state"].task
+ assert "White-box execution guidance (recommended when source is available):" in child_task
+ assert "mandatory" not in child_task.lower()
+
+
+def test_delegation_prompt_includes_wiki_memory_instruction_in_whitebox(monkeypatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
+
+ agents_graph_actions._agent_graph["nodes"].clear()
+ agents_graph_actions._agent_graph["edges"].clear()
+ agents_graph_actions._agent_messages.clear()
+ agents_graph_actions._running_agents.clear()
+ agents_graph_actions._agent_instances.clear()
+ agents_graph_actions._agent_states.clear()
+
+ parent_id = "parent-1"
+ child_id = "child-1"
+ agents_graph_actions._agent_graph["nodes"][parent_id] = {"name": "Parent", "status": "running"}
+ agents_graph_actions._agent_graph["nodes"][child_id] = {"name": "Child", "status": "running"}
+
+ class FakeState:
+ def __init__(self) -> None:
+ self.agent_id = child_id
+ self.agent_name = "Child"
+ self.parent_id = parent_id
+ self.task = "analyze source risks"
+ self.stop_requested = False
+ self.messages: list[tuple[str, str]] = []
+
+ def add_message(self, role: str, content: str) -> None:
+ self.messages.append((role, content))
+
+ def model_dump(self) -> dict[str, str]:
+ return {"agent_id": self.agent_id}
+
+ class FakeAgent:
+ def __init__(self) -> None:
+ self.llm_config = LLMConfig(is_whitebox=True)
+
+ async def agent_loop(self, _task: str) -> dict[str, bool]:
+ return {"ok": True}
+
+ state = FakeState()
+ agent = FakeAgent()
+ agents_graph_actions._agent_instances[child_id] = agent
+ result = agents_graph_actions._run_agent_in_thread(agent, state, inherited_messages=[])
+
+ assert result["result"] == {"ok": True}
+ task_messages = [msg for role, msg in state.messages if role == "user"]
+ assert task_messages
+ assert 'list_notes(category="wiki")' in task_messages[-1]
+ assert "get_note(note_id=...)" in task_messages[-1]
+ assert "Before agent_finish" in task_messages[-1]
+
+
+def test_agent_finish_appends_wiki_update_for_whitebox(monkeypatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
+
+ agents_graph_actions._agent_graph["nodes"].clear()
+ agents_graph_actions._agent_graph["edges"].clear()
+ agents_graph_actions._agent_messages.clear()
+ agents_graph_actions._running_agents.clear()
+ agents_graph_actions._agent_instances.clear()
+ agents_graph_actions._agent_states.clear()
+
+ parent_id = "parent-2"
+ child_id = "child-2"
+ agents_graph_actions._agent_graph["nodes"][parent_id] = {
+ "name": "Parent",
+ "task": "parent task",
+ "status": "running",
+ "parent_id": None,
+ }
+ agents_graph_actions._agent_graph["nodes"][child_id] = {
+ "name": "Child",
+ "task": "child task",
+ "status": "running",
+ "parent_id": parent_id,
+ }
+ agents_graph_actions._agent_instances[child_id] = SimpleNamespace(
+ llm_config=LLMConfig(is_whitebox=True)
+ )
+
+ captured: dict[str, str] = {}
+
+ def fake_list_notes(category=None):
+ assert category == "wiki"
+ return {
+ "success": True,
+ "notes": [{"note_id": "wiki-note-1", "content": "Existing wiki content"}],
+ "total_count": 1,
+ }
+
+ captured_get: dict[str, str] = {}
+
+ def fake_get_note(note_id: str):
+ captured_get["note_id"] = note_id
+ return {
+ "success": True,
+ "note": {
+ "note_id": note_id,
+ "title": "Repo Wiki",
+ "content": "Existing wiki content",
+ },
+ }
+
+ def fake_append_note_content(note_id: str, delta: str):
+ captured["note_id"] = note_id
+ captured["delta"] = delta
+ return {"success": True, "note_id": note_id}
+
+ monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
+ monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
+ monkeypatch.setattr("strix.tools.notes.notes_actions.append_note_content", fake_append_note_content)
+
+ state = SimpleNamespace(agent_id=child_id, parent_id=parent_id)
+ result = agents_graph_actions.agent_finish(
+ agent_state=state,
+ result_summary="AST pass completed",
+ findings=["Found route sink candidate"],
+ success=True,
+ final_recommendations=["Validate sink with dynamic PoC"],
+ )
+
+ assert result["agent_completed"] is True
+ assert captured_get["note_id"] == "wiki-note-1"
+ assert captured["note_id"] == "wiki-note-1"
+ assert "Agent Update: Child" in captured["delta"]
+ assert "AST pass completed" in captured["delta"]
+
+
+def test_run_agent_in_thread_injects_shared_wiki_context_in_whitebox(monkeypatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
+
+ agents_graph_actions._agent_graph["nodes"].clear()
+ agents_graph_actions._agent_graph["edges"].clear()
+ agents_graph_actions._agent_messages.clear()
+ agents_graph_actions._running_agents.clear()
+ agents_graph_actions._agent_instances.clear()
+ agents_graph_actions._agent_states.clear()
+
+ parent_id = "parent-3"
+ child_id = "child-3"
+ agents_graph_actions._agent_graph["nodes"][parent_id] = {"name": "Parent", "status": "running"}
+ agents_graph_actions._agent_graph["nodes"][child_id] = {"name": "Child", "status": "running"}
+
+ class FakeState:
+ def __init__(self) -> None:
+ self.agent_id = child_id
+ self.agent_name = "Child"
+ self.parent_id = parent_id
+ self.task = "map source"
+ self.stop_requested = False
+ self.messages: list[tuple[str, str]] = []
+
+ def add_message(self, role: str, content: str) -> None:
+ self.messages.append((role, content))
+
+ def model_dump(self) -> dict[str, str]:
+ return {"agent_id": self.agent_id}
+
+ class FakeAgent:
+ def __init__(self) -> None:
+ self.llm_config = LLMConfig(is_whitebox=True)
+
+ async def agent_loop(self, _task: str) -> dict[str, bool]:
+ return {"ok": True}
+
+ captured_get: dict[str, str] = {}
+
+ def fake_list_notes(category=None):
+ assert category == "wiki"
+ return {
+ "success": True,
+ "notes": [{"note_id": "wiki-ctx-1"}],
+ "total_count": 1,
+ }
+
+ def fake_get_note(note_id: str):
+ captured_get["note_id"] = note_id
+ return {
+ "success": True,
+ "note": {
+ "note_id": note_id,
+ "title": "Shared Repo Wiki",
+ "content": "Architecture: server/client split",
+ },
+ }
+
+ monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
+ monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
+
+ state = FakeState()
+ agent = FakeAgent()
+ agents_graph_actions._agent_instances[child_id] = agent
+ result = agents_graph_actions._run_agent_in_thread(agent, state, inherited_messages=[])
+
+ assert result["result"] == {"ok": True}
+ assert captured_get["note_id"] == "wiki-ctx-1"
+ user_messages = [content for role, content in state.messages if role == "user"]
+ assert user_messages
+ assert " None:
+ selected_note_ids: list[str] = []
+
+ def fake_list_notes(category=None):
+ assert category == "wiki"
+ return {
+ "success": True,
+ "notes": [
+ {"note_id": "wiki-other", "tags": ["repo:other"]},
+ {"note_id": "wiki-target", "tags": ["repo:appsmith"]},
+ ],
+ "total_count": 2,
+ }
+
+ def fake_get_note(note_id: str):
+ selected_note_ids.append(note_id)
+ return {
+ "success": True,
+ "note": {"note_id": note_id, "title": "Repo Wiki", "content": "content"},
+ }
+
+ monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
+ monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
+
+ agent_state = SimpleNamespace(
+ task="analyze /workspace/appsmith",
+ context={"whitebox_repo_tags": ["repo:appsmith"]},
+ )
+ note = agents_graph_actions._load_primary_wiki_note(agent_state)
+
+ assert note is not None
+ assert note["note_id"] == "wiki-target"
+ assert selected_note_ids == ["wiki-target"]
diff --git a/tests/tools/test_notes_wiki.py b/tests/tools/test_notes_wiki.py
new file mode 100644
index 0000000..31031e3
--- /dev/null
+++ b/tests/tools/test_notes_wiki.py
@@ -0,0 +1,214 @@
+from pathlib import Path
+
+from strix.telemetry.tracer import Tracer, get_global_tracer, set_global_tracer
+from strix.tools.notes import notes_actions
+
+
+def _reset_notes_state() -> None:
+ notes_actions._notes_storage.clear()
+ notes_actions._loaded_notes_run_dir = None
+
+
+def test_wiki_notes_are_persisted_and_removed(tmp_path: Path, monkeypatch) -> None:
+ monkeypatch.chdir(tmp_path)
+ _reset_notes_state()
+
+ previous_tracer = get_global_tracer()
+ tracer = Tracer("wiki-test-run")
+ set_global_tracer(tracer)
+
+ try:
+ created = notes_actions.create_note(
+ title="Repo Map",
+ content="## Architecture\n- monolith",
+ category="wiki",
+ tags=["source-map"],
+ )
+ assert created["success"] is True
+ note_id = created["note_id"]
+ assert isinstance(note_id, str)
+
+ note = notes_actions._notes_storage[note_id]
+ wiki_filename = note.get("wiki_filename")
+ assert isinstance(wiki_filename, str)
+
+ wiki_path = tmp_path / "strix_runs" / "wiki-test-run" / "wiki" / wiki_filename
+ assert wiki_path.exists()
+ assert "## Architecture" in wiki_path.read_text(encoding="utf-8")
+
+ updated = notes_actions.update_note(
+ note_id=note_id,
+ content="## Architecture\n- service-oriented",
+ )
+ assert updated["success"] is True
+ assert "service-oriented" in wiki_path.read_text(encoding="utf-8")
+
+ deleted = notes_actions.delete_note(note_id=note_id)
+ assert deleted["success"] is True
+ assert wiki_path.exists() is False
+ finally:
+ _reset_notes_state()
+ set_global_tracer(previous_tracer) # type: ignore[arg-type]
+
+
+def test_notes_jsonl_replay_survives_memory_reset(tmp_path: Path, monkeypatch) -> None:
+ monkeypatch.chdir(tmp_path)
+ _reset_notes_state()
+
+ previous_tracer = get_global_tracer()
+ tracer = Tracer("notes-replay-run")
+ set_global_tracer(tracer)
+
+ try:
+ created = notes_actions.create_note(
+ title="Auth findings",
+ content="initial finding",
+ category="findings",
+ tags=["auth"],
+ )
+ assert created["success"] is True
+ note_id = created["note_id"]
+ assert isinstance(note_id, str)
+
+ notes_path = tmp_path / "strix_runs" / "notes-replay-run" / "notes" / "notes.jsonl"
+ assert notes_path.exists() is True
+
+ _reset_notes_state()
+ listed = notes_actions.list_notes(category="findings")
+ assert listed["success"] is True
+ assert listed["total_count"] == 1
+ assert listed["notes"][0]["note_id"] == note_id
+ assert "content" not in listed["notes"][0]
+ assert "content_preview" in listed["notes"][0]
+
+ updated = notes_actions.update_note(note_id=note_id, content="updated finding")
+ assert updated["success"] is True
+
+ _reset_notes_state()
+ listed_after_update = notes_actions.list_notes(search="updated finding")
+ assert listed_after_update["success"] is True
+ assert listed_after_update["total_count"] == 1
+ assert listed_after_update["notes"][0]["note_id"] == note_id
+ assert listed_after_update["notes"][0]["content_preview"] == "updated finding"
+
+ listed_with_content = notes_actions.list_notes(
+ category="findings",
+ include_content=True,
+ )
+ assert listed_with_content["success"] is True
+ assert listed_with_content["total_count"] == 1
+ assert listed_with_content["notes"][0]["content"] == "updated finding"
+
+ deleted = notes_actions.delete_note(note_id=note_id)
+ assert deleted["success"] is True
+
+ _reset_notes_state()
+ listed_after_delete = notes_actions.list_notes(category="findings")
+ assert listed_after_delete["success"] is True
+ assert listed_after_delete["total_count"] == 0
+ finally:
+ _reset_notes_state()
+ set_global_tracer(previous_tracer) # type: ignore[arg-type]
+
+
+def test_get_note_returns_full_note(tmp_path: Path, monkeypatch) -> None:
+ monkeypatch.chdir(tmp_path)
+ _reset_notes_state()
+
+ previous_tracer = get_global_tracer()
+ tracer = Tracer("get-note-run")
+ set_global_tracer(tracer)
+
+ try:
+ created = notes_actions.create_note(
+ title="Repo wiki",
+ content="entrypoints and sinks",
+ category="wiki",
+ tags=["repo:appsmith"],
+ )
+ assert created["success"] is True
+ note_id = created["note_id"]
+ assert isinstance(note_id, str)
+
+ result = notes_actions.get_note(note_id=note_id)
+ assert result["success"] is True
+ assert result["note"]["note_id"] == note_id
+ assert result["note"]["content"] == "entrypoints and sinks"
+ finally:
+ _reset_notes_state()
+ set_global_tracer(previous_tracer) # type: ignore[arg-type]
+
+
+def test_append_note_content_appends_delta(tmp_path: Path, monkeypatch) -> None:
+ monkeypatch.chdir(tmp_path)
+ _reset_notes_state()
+
+ previous_tracer = get_global_tracer()
+ tracer = Tracer("append-note-run")
+ set_global_tracer(tracer)
+
+ try:
+ created = notes_actions.create_note(
+ title="Repo wiki",
+ content="base",
+ category="wiki",
+ tags=["repo:demo"],
+ )
+ assert created["success"] is True
+ note_id = created["note_id"]
+ assert isinstance(note_id, str)
+
+ appended = notes_actions.append_note_content(
+ note_id=note_id,
+ delta="\n\n## Agent Update: worker\nSummary: done",
+ )
+ assert appended["success"] is True
+
+ loaded = notes_actions.get_note(note_id=note_id)
+ assert loaded["success"] is True
+ assert loaded["note"]["content"] == "base\n\n## Agent Update: worker\nSummary: done"
+ finally:
+ _reset_notes_state()
+ set_global_tracer(previous_tracer) # type: ignore[arg-type]
+
+
+def test_list_and_get_note_handle_wiki_repersist_oserror_gracefully(
+ tmp_path: Path, monkeypatch
+) -> None:
+ monkeypatch.chdir(tmp_path)
+ _reset_notes_state()
+
+ previous_tracer = get_global_tracer()
+ tracer = Tracer("wiki-repersist-oserror-run")
+ set_global_tracer(tracer)
+
+ try:
+ created = notes_actions.create_note(
+ title="Repo wiki",
+ content="initial wiki content",
+ category="wiki",
+ tags=["repo:demo"],
+ )
+ assert created["success"] is True
+ note_id = created["note_id"]
+ assert isinstance(note_id, str)
+
+ _reset_notes_state()
+
+ def _raise_oserror(*_args, **_kwargs) -> None:
+ raise OSError("disk full")
+
+ monkeypatch.setattr(notes_actions, "_persist_wiki_note", _raise_oserror)
+
+ listed = notes_actions.list_notes(category="wiki")
+ assert listed["success"] is True
+ assert listed["total_count"] == 1
+ assert listed["notes"][0]["note_id"] == note_id
+
+ fetched = notes_actions.get_note(note_id=note_id)
+ assert fetched["success"] is True
+ assert fetched["note"]["note_id"] == note_id
+ assert fetched["note"]["content"] == "initial wiki content"
+ finally:
+ _reset_notes_state()
+ set_global_tracer(previous_tracer) # type: ignore[arg-type]