10 Commits

Author SHA1 Message Date
bearsyankees
4f10ae40d7 whitebox follow up: better wiki 2026-03-31 16:44:48 -04:00
bearsyankees
c0243367a8 grep 2026-03-23 21:38:30 -04:00
bearsyankees
970bdda6c9 better sg 2026-03-23 21:09:22 -04:00
bearsyankees
861e2f7683 feat: enhance resolve_diff_scope_context to handle skipped diff-scope sources and add related tests 2026-03-23 20:52:26 -04:00
bearsyankees
21f89dd6bd feat: implement append_note_content function and update related tests 2026-03-23 20:36:45 -04:00
bearsyankees
7a06df8e05 Merge remote-tracking branch 'origin/main' into better-whitebox
# Conflicts:
#	strix/agents/StrixAgent/strix_agent.py
#	strix/agents/StrixAgent/system_prompt.jinja
2026-03-23 16:46:35 -04:00
bearsyankees
69a59890ff Feat: expanded source aware testing 2026-03-23 16:43:58 -04:00
bearsyankees
b67712beec Merge origin/main into better-whitebox 2026-03-19 19:36:17 -06:00
bearsyankees
f65a97f6b2 STR-39: expand source-aware whitebox workflows and wiki memory 2026-03-19 19:33:16 -06:00
bearsyankees
afb85c21b1 feat: Implement diff-scope functionality for pull requests and CI integration 2026-02-16 23:16:04 -05:00
31 changed files with 2699 additions and 106 deletions

View File

@@ -168,11 +168,17 @@ strix --target https://your-app.com --instruction "Perform authenticated testing
# Multi-target testing (source code + deployed app)
strix -t https://github.com/org/app -t https://your-app.com
# White-box source-aware scan (local repository)
strix --target ./app-directory --scan-mode standard
# Focused testing with custom instructions
strix --target api.your-app.com --instruction "Focus on business logic flaws and IDOR vulnerabilities"
# Provide detailed instructions through file (e.g., rules of engagement, scope, exclusions)
strix --target api.your-app.com --instruction-file ./instruction.md
# Force PR diff-scope against a specific base branch
strix -n --target ./ --scan-mode quick --scope-mode diff --diff-base origin/main
```
### Headless Mode
@@ -198,6 +204,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Install Strix
run: curl -sSL https://strix.ai/install | bash
@@ -210,6 +218,11 @@ jobs:
run: strix -n -t ./ --scan-mode quick
```
> [!TIP]
> In CI pull request runs, Strix automatically scopes quick reviews to changed files.
> If diff-scope cannot resolve, ensure checkout uses full history (`fetch-depth: 0`) or pass
> `--diff-base` explicitly.
### Configuration
```bash

View File

@@ -97,7 +97,36 @@ RUN mkdir -p /home/pentester/.npm-global
RUN npm install -g retire@latest && \
npm install -g eslint@latest && \
npm install -g js-beautify@latest
npm install -g js-beautify@latest && \
npm install -g @ast-grep/cli@latest && \
npm install -g tree-sitter-cli@latest
RUN set -eux; \
TS_PARSER_DIR="/home/pentester/.tree-sitter/parsers"; \
mkdir -p "${TS_PARSER_DIR}"; \
for repo in tree-sitter-java tree-sitter-javascript tree-sitter-python tree-sitter-go tree-sitter-bash tree-sitter-json tree-sitter-yaml tree-sitter-typescript; do \
if [ "$repo" = "tree-sitter-yaml" ]; then \
repo_url="https://github.com/tree-sitter-grammars/${repo}.git"; \
else \
repo_url="https://github.com/tree-sitter/${repo}.git"; \
fi; \
if [ ! -d "${TS_PARSER_DIR}/${repo}" ]; then \
git clone --depth 1 "${repo_url}" "${TS_PARSER_DIR}/${repo}"; \
fi; \
done; \
if [ -d "${TS_PARSER_DIR}/tree-sitter-typescript/typescript" ]; then \
ln -sfn "${TS_PARSER_DIR}/tree-sitter-typescript/typescript" "${TS_PARSER_DIR}/tree-sitter-typescript-typescript"; \
fi; \
if [ -d "${TS_PARSER_DIR}/tree-sitter-typescript/tsx" ]; then \
ln -sfn "${TS_PARSER_DIR}/tree-sitter-typescript/tsx" "${TS_PARSER_DIR}/tree-sitter-typescript-tsx"; \
fi; \
tree-sitter init-config >/dev/null 2>&1 || true; \
TS_CONFIG="/home/pentester/.config/tree-sitter/config.json"; \
mkdir -p "$(dirname "${TS_CONFIG}")"; \
[ -f "${TS_CONFIG}" ] || printf '{}\n' > "${TS_CONFIG}"; \
TMP_CFG="$(mktemp)"; \
jq --arg p "${TS_PARSER_DIR}" '.["parser-directories"] = ((.["parser-directories"] // []) + [$p] | unique)' "${TS_CONFIG}" > "${TMP_CFG}"; \
mv "${TMP_CFG}" "${TS_CONFIG}"
WORKDIR /home/pentester/tools
RUN git clone https://github.com/aravind0x7/JS-Snooper.git && \
@@ -110,6 +139,18 @@ RUN git clone https://github.com/aravind0x7/JS-Snooper.git && \
USER root
RUN curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin
RUN set -eux; \
ARCH="$(uname -m)"; \
case "$ARCH" in \
x86_64) GITLEAKS_ARCH="x64" ;; \
aarch64|arm64) GITLEAKS_ARCH="arm64" ;; \
*) echo "Unsupported architecture: $ARCH" >&2; exit 1 ;; \
esac; \
TAG="$(curl -fsSL https://api.github.com/repos/gitleaks/gitleaks/releases/latest | jq -r .tag_name)"; \
curl -fsSL "https://github.com/gitleaks/gitleaks/releases/download/${TAG}/gitleaks_${TAG#v}_linux_${GITLEAKS_ARCH}.tar.gz" -o /tmp/gitleaks.tgz; \
tar -xzf /tmp/gitleaks.tgz -C /tmp; \
install -m 0755 /tmp/gitleaks /usr/local/bin/gitleaks; \
rm -f /tmp/gitleaks /tmp/gitleaks.tgz
RUN apt-get update && apt-get install -y zaproxy

View File

@@ -13,6 +13,12 @@ Use the `-n` or `--non-interactive` flag:
strix -n --target ./app --scan-mode quick
```
For pull-request style CI runs, Strix automatically scopes quick scans to changed files. You can force this behavior and set a base ref explicitly:
```bash
strix -n --target ./app --scan-mode quick --scope-mode diff --diff-base origin/main
```
## Exit Codes
| Code | Meaning |
@@ -78,3 +84,7 @@ jobs:
<Note>
All CI platforms require Docker access. Ensure your runner has Docker available.
</Note>
<Tip>
If diff-scope fails in CI, fetch full git history (for example, `fetch-depth: 0` in GitHub Actions) so merge-base and branch comparison can be resolved.
</Tip>

View File

@@ -18,6 +18,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install Strix
run: curl -sSL https://strix.ai/install | bash
@@ -58,3 +60,7 @@ The workflow fails when vulnerabilities are found:
<Tip>
Use `quick` mode for PRs to keep feedback fast. Schedule `deep` scans nightly.
</Tip>
<Note>
For pull_request workflows, Strix automatically uses changed-files diff-scope in CI/headless runs. If diff resolution fails, ensure full history is fetched (`fetch-depth: 0`) or set `--diff-base`.
</Note>

View File

@@ -45,13 +45,21 @@ Strix runs inside a Kali Linux-based Docker container with a comprehensive set o
| [js-beautify](https://github.com/beautifier/js-beautify) | JavaScript deobfuscation |
| [JSHint](https://jshint.com) | JavaScript code quality tool |
## Source-Aware Analysis
| Tool | Description |
| ------------------------------------------------------- | --------------------------------------------- |
| [Semgrep](https://github.com/semgrep/semgrep) | Fast SAST and custom rule matching |
| [ast-grep](https://ast-grep.github.io) | Structural AST/CST-aware code search (`sg`) |
| [Tree-sitter](https://tree-sitter.github.io/tree-sitter/) | Syntax tree parsing and symbol extraction (Java/JS/TS/Python/Go/Bash/JSON/YAML grammars pre-configured) |
| [Bandit](https://bandit.readthedocs.io) | Python security linter |
## Secret Detection
| Tool | Description |
| ----------------------------------------------------------- | ------------------------------------- |
| [TruffleHog](https://github.com/trufflesecurity/trufflehog) | Find secrets in code and history |
| [Semgrep](https://github.com/semgrep/semgrep) | Static analysis for security patterns |
| [Bandit](https://bandit.readthedocs.io) | Python security linter |
| [Gitleaks](https://github.com/gitleaks/gitleaks) | Detect hardcoded secrets in repositories |
## Authentication Testing
@@ -64,7 +72,7 @@ Strix runs inside a Kali Linux-based Docker container with a comprehensive set o
| Tool | Description |
| -------------------------- | ---------------------------------------------- |
| [Trivy](https://trivy.dev) | Container and dependency vulnerability scanner |
| [Trivy](https://trivy.dev) | Filesystem/container scanning for vulns, misconfigurations, secrets, and licenses |
## HTTP Proxy

View File

@@ -32,14 +32,18 @@ sqlmap -u "https://example.com/page?id=1"
### Code Analysis
```bash
# Search for secrets
trufflehog filesystem ./
# Static analysis
# Fast SAST triage
semgrep --config auto ./src
# Grep for patterns
grep -r "password" ./
# Structural AST search
sg scan ./src
# Secret detection
gitleaks detect --source ./
trufflehog filesystem ./
# Supply-chain and misconfiguration checks
trivy fs ./
```
### Custom Scripts

View File

@@ -27,6 +27,14 @@ strix --target <target> [options]
Scan depth: `quick`, `standard`, or `deep`.
</ParamField>
<ParamField path="--scope-mode" type="string" default="auto">
Code scope mode: `auto` (enable PR diff-scope in CI/headless runs), `diff` (force changed-files scope), or `full` (disable diff-scope).
</ParamField>
<ParamField path="--diff-base" type="string">
Target branch or commit to compare against (e.g., `origin/main`). Defaults to the repository's default branch.
</ParamField>
<ParamField path="--non-interactive, -n" type="boolean">
Run in headless mode without TUI. Ideal for CI/CD.
</ParamField>
@@ -50,6 +58,9 @@ strix --target api.example.com --instruction "Focus on IDOR and auth bypass"
# CI/CD mode
strix -n --target ./ --scan-mode quick
# Force diff-scope against a specific base ref
strix -n --target ./ --scan-mode quick --scope-mode diff --diff-base origin/main
# Multi-target white-box testing
strix -t https://github.com/org/app -t https://staging.example.com
```

View File

@@ -31,6 +31,8 @@ Balanced testing for routine security reviews. Best for:
**Duration**: 30 minutes to 1 hour
**White-box behavior**: Uses source-aware mapping and static triage to prioritize dynamic exploit validation paths.
## Deep
```bash
@@ -44,6 +46,8 @@ Thorough penetration testing. Best for:
**Duration**: 1-4 hours depending on target complexity
**White-box behavior**: Runs broad source-aware triage (`semgrep`, AST structural search, secrets, supply-chain checks) and then systematically validates top candidates dynamically.
<Note>
Deep mode is the default. It explores edge cases, chained vulnerabilities, and complex attack paths.
</Note>

View File

@@ -59,6 +59,7 @@ class StrixAgent(BaseAgent):
async def execute_scan(self, scan_config: dict[str, Any]) -> dict[str, Any]: # noqa: PLR0912
user_instructions = scan_config.get("user_instructions", "")
targets = scan_config.get("targets", [])
diff_scope = scan_config.get("diff_scope", {}) or {}
self.llm.set_system_prompt_context(self._build_system_scope_context(scan_config))
repositories = []
@@ -120,6 +121,28 @@ class StrixAgent(BaseAgent):
task_parts.append("\n\nIP Addresses:")
task_parts.extend(f"- {ip}" for ip in ip_addresses)
if diff_scope.get("active"):
task_parts.append("\n\nScope Constraints:")
task_parts.append(
"- Pull request diff-scope mode is active. Prioritize changed files "
"and use other files only for context."
)
for repo_scope in diff_scope.get("repos", []):
repo_label = (
repo_scope.get("workspace_subdir")
or repo_scope.get("source_path")
or "repository"
)
changed_count = repo_scope.get("analyzable_files_count", 0)
deleted_count = repo_scope.get("deleted_files_count", 0)
task_parts.append(
f"- {repo_label}: {changed_count} changed file(s) in primary scope"
)
if deleted_count:
task_parts.append(
f"- {repo_label}: {deleted_count} deleted file(s) are context-only"
)
task_description = " ".join(task_parts)
if user_instructions:

View File

@@ -111,12 +111,18 @@ BLACK-BOX TESTING (domain/subdomain only):
WHITE-BOX TESTING (code provided):
- MUST perform BOTH static AND dynamic analysis
- Static: Review code for vulnerabilities
- Dynamic: Run the application and test live
- NEVER rely solely on static code analysis - always test dynamically
- You MUST begin at the very first step by running the code and testing live.
- Static: Use source-aware triage first to map risk quickly (`semgrep`, `ast-grep`, Tree-sitter tooling, `gitleaks`, `trufflehog`, `trivy fs`). Then review code for vulnerabilities
- Static coverage floor: execute at least one structural AST mapping pass (`sg` and/or Tree-sitter) per repository and keep artifact output
- Static coverage target per repository: run one `semgrep` pass, one secrets pass (`gitleaks` and/or `trufflehog`), one `trivy fs` pass, and one AST-structural pass (`sg` and/or Tree-sitter); if any are skipped, record why in the shared wiki
- Keep AST artifacts bounded and high-signal: scope to relevant paths/hypotheses, avoid whole-repo generic function dumps
- AST target selection rule: build `sg-targets.txt` from `semgrep.json` scope first (`paths.scanned`, fallback to unique `results[].path`), then run `xargs ... sg run` against that file list. Only use path-heuristic fallback if semgrep scope is unavailable, and log fallback reason in the wiki.
- Shared memory: Use notes as shared working memory; discover wiki notes with `list_notes`, read `wiki:overview` first when available, then read `wiki:security` via `get_note(note_id=...)` before analysis
- Before `agent_finish`/`finish_scan`, update `wiki:security` with scanner summaries, key routes/sinks, and dynamic follow-up plan
- Dynamic: Run the application and test live to validate exploitability
- NEVER rely solely on static code analysis when dynamic validation is possible
- Begin with fast source triage and dynamic run preparation in parallel; use static findings to prioritize live testing.
- Local execution, unit/integration testing, patch verification, and HTTP requests against locally started in-scope services are normal authorized white-box validation
- If dynamically running the code proves impossible after exhaustive attempts, pivot to just comprehensive static analysis.
- If dynamically running the code proves impossible after exhaustive attempts, pivot to comprehensive static analysis.
- Try to infer how to run the code based on its structure and content.
- FIX discovered vulnerabilities in code in same file.
- Test patches to confirm vulnerability removal.
@@ -460,8 +466,12 @@ JAVASCRIPT ANALYSIS:
CODE ANALYSIS:
- semgrep - Static analysis/SAST
- ast-grep (sg) - Structural AST/CST-aware code search
- tree-sitter - Syntax-aware parsing and symbol extraction support
- bandit - Python security linter
- trufflehog - Secret detection in code
- gitleaks - Secret detection in repository content/history
- trivy fs - Filesystem vulnerability/misconfiguration/license/secret scanning
SPECIALIZED TOOLS:
- jwt_tool - JWT token manipulation

View File

@@ -72,9 +72,13 @@ async def run_cli(args: Any) -> None: # noqa: PLR0915
"targets": args.targets_info,
"user_instructions": args.instruction or "",
"run_name": args.run_name,
"diff_scope": getattr(args, "diff_scope", {"active": False}),
}
llm_config = LLMConfig(scan_mode=scan_mode)
llm_config = LLMConfig(
scan_mode=scan_mode,
is_whitebox=bool(getattr(args, "local_sources", [])),
)
agent_config = {
"llm_config": llm_config,
"max_iterations": 300,

View File

@@ -36,6 +36,7 @@ from strix.interface.utils import ( # noqa: E402
image_exists,
infer_target_type,
process_pull_line,
resolve_diff_scope_context,
rewrite_localhost_targets,
validate_config_file,
validate_llm_response,
@@ -357,6 +358,28 @@ Examples:
),
)
parser.add_argument(
"--scope-mode",
type=str,
choices=["auto", "diff", "full"],
default="auto",
help=(
"Scope mode for code targets: "
"'auto' enables PR diff-scope in CI/headless runs, "
"'diff' forces changed-files scope, "
"'full' disables diff-scope."
),
)
parser.add_argument(
"--diff-base",
type=str,
help=(
"Target branch or commit to compare against (e.g., origin/main). "
"Defaults to the repository's default branch."
),
)
parser.add_argument(
"--config",
type=str,
@@ -514,7 +537,7 @@ def persist_config() -> None:
save_current_config()
def main() -> None:
def main() -> None: # noqa: PLR0912, PLR0915
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
@@ -541,6 +564,38 @@ def main() -> None:
target_info["details"]["cloned_repo_path"] = cloned_path
args.local_sources = collect_local_sources(args.targets_info)
try:
diff_scope = resolve_diff_scope_context(
local_sources=args.local_sources,
scope_mode=args.scope_mode,
diff_base=args.diff_base,
non_interactive=args.non_interactive,
)
except ValueError as e:
console = Console()
error_text = Text()
error_text.append("DIFF SCOPE RESOLUTION FAILED", style="bold red")
error_text.append("\n\n", style="white")
error_text.append(str(e), style="white")
panel = Panel(
error_text,
title="[bold white]STRIX",
title_align="left",
border_style="red",
padding=(1, 2),
)
console.print("\n")
console.print(panel)
console.print()
sys.exit(1)
args.diff_scope = diff_scope.metadata
if diff_scope.instruction_block:
if args.instruction:
args.instruction = f"{diff_scope.instruction_block}\n\n{args.instruction}"
else:
args.instruction = diff_scope.instruction_block
is_whitebox = bool(args.local_sources)

View File

@@ -117,6 +117,8 @@ class ListNotesRenderer(BaseToolRenderer):
title = note.get("title", "").strip() or "(untitled)"
category = note.get("category", "general")
note_content = note.get("content", "").strip()
if not note_content:
note_content = note.get("content_preview", "").strip()
text.append("\n - ")
text.append(title)
@@ -131,3 +133,35 @@ class ListNotesRenderer(BaseToolRenderer):
css_classes = cls.get_css_classes("completed")
return Static(text, classes=css_classes)
@register_tool_renderer
class GetNoteRenderer(BaseToolRenderer):
tool_name: ClassVar[str] = "get_note"
css_classes: ClassVar[list[str]] = ["tool-call", "notes-tool"]
@classmethod
def render(cls, tool_data: dict[str, Any]) -> Static:
result = tool_data.get("result")
text = Text()
text.append("", style="#fbbf24")
text.append("note read", style="dim")
if result and isinstance(result, dict) and result.get("success"):
note = result.get("note", {}) or {}
title = str(note.get("title", "")).strip() or "(untitled)"
category = note.get("category", "general")
content = str(note.get("content", "")).strip()
text.append("\n ")
text.append(title)
text.append(f" ({category})", style="dim")
if content:
text.append("\n ")
text.append(content, style="dim")
else:
text.append("\n ")
text.append("Loading...", style="dim")
css_classes = cls.get_css_classes("completed")
return Static(text, classes=css_classes)

View File

@@ -742,11 +742,16 @@ class StrixTUIApp(App): # type: ignore[misc]
"targets": args.targets_info,
"user_instructions": args.instruction or "",
"run_name": args.run_name,
"diff_scope": getattr(args, "diff_scope", {"active": False}),
}
def _build_agent_config(self, args: argparse.Namespace) -> dict[str, Any]:
scan_mode = getattr(args, "scan_mode", "deep")
llm_config = LLMConfig(scan_mode=scan_mode, interactive=True)
llm_config = LLMConfig(
scan_mode=scan_mode,
interactive=True,
is_whitebox=bool(getattr(args, "local_sources", [])),
)
config = {
"llm_config": llm_config,

View File

@@ -1,11 +1,13 @@
import ipaddress
import json
import os
import re
import secrets
import shutil
import subprocess
import sys
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
@@ -461,6 +463,612 @@ def generate_run_name(targets_info: list[dict[str, Any]] | None = None) -> str:
# Target processing utilities
_SUPPORTED_SCOPE_MODES = {"auto", "diff", "full"}
_MAX_FILES_PER_SECTION = 120
@dataclass
class DiffEntry:
status: str
path: str
old_path: str | None = None
similarity: int | None = None
@dataclass
class RepoDiffScope:
source_path: str
workspace_subdir: str | None
base_ref: str
merge_base: str
added_files: list[str]
modified_files: list[str]
renamed_files: list[dict[str, Any]]
deleted_files: list[str]
analyzable_files: list[str]
truncated_sections: dict[str, bool] = field(default_factory=dict)
def to_metadata(self) -> dict[str, Any]:
return {
"source_path": self.source_path,
"workspace_subdir": self.workspace_subdir,
"base_ref": self.base_ref,
"merge_base": self.merge_base,
"added_files": self.added_files,
"modified_files": self.modified_files,
"renamed_files": self.renamed_files,
"deleted_files": self.deleted_files,
"analyzable_files": self.analyzable_files,
"added_files_count": len(self.added_files),
"modified_files_count": len(self.modified_files),
"renamed_files_count": len(self.renamed_files),
"deleted_files_count": len(self.deleted_files),
"analyzable_files_count": len(self.analyzable_files),
"truncated_sections": self.truncated_sections,
}
@dataclass
class DiffScopeResult:
active: bool
mode: str
instruction_block: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
def _run_git_command(
repo_path: Path, args: list[str], check: bool = True
) -> subprocess.CompletedProcess[str]:
return subprocess.run( # noqa: S603
["git", "-C", str(repo_path), *args], # noqa: S607
capture_output=True,
text=True,
check=check,
)
def _run_git_command_raw(
repo_path: Path, args: list[str], check: bool = True
) -> subprocess.CompletedProcess[bytes]:
return subprocess.run( # noqa: S603
["git", "-C", str(repo_path), *args], # noqa: S607
capture_output=True,
check=check,
)
def _is_ci_environment(env: dict[str, str]) -> bool:
return any(
env.get(key)
for key in (
"CI",
"GITHUB_ACTIONS",
"GITLAB_CI",
"JENKINS_URL",
"BUILDKITE",
"CIRCLECI",
)
)
def _is_pr_environment(env: dict[str, str]) -> bool:
return any(
env.get(key)
for key in (
"GITHUB_BASE_REF",
"GITHUB_HEAD_REF",
"CI_MERGE_REQUEST_TARGET_BRANCH_NAME",
"GITLAB_MERGE_REQUEST_TARGET_BRANCH_NAME",
"SYSTEM_PULLREQUEST_TARGETBRANCH",
)
)
def _is_git_repo(repo_path: Path) -> bool:
result = _run_git_command(repo_path, ["rev-parse", "--is-inside-work-tree"], check=False)
return result.returncode == 0 and result.stdout.strip().lower() == "true"
def _is_repo_shallow(repo_path: Path) -> bool:
result = _run_git_command(repo_path, ["rev-parse", "--is-shallow-repository"], check=False)
if result.returncode == 0:
value = result.stdout.strip().lower()
if value in {"true", "false"}:
return value == "true"
git_meta = repo_path / ".git"
if git_meta.is_dir():
return (git_meta / "shallow").exists()
if git_meta.is_file():
try:
content = git_meta.read_text(encoding="utf-8").strip()
except OSError:
return False
if content.startswith("gitdir:"):
git_dir = content.split(":", 1)[1].strip()
resolved = (repo_path / git_dir).resolve()
return (resolved / "shallow").exists()
return False
def _git_ref_exists(repo_path: Path, ref: str) -> bool:
result = _run_git_command(repo_path, ["rev-parse", "--verify", "--quiet", ref], check=False)
return result.returncode == 0
def _resolve_origin_head_ref(repo_path: Path) -> str | None:
result = _run_git_command(
repo_path, ["symbolic-ref", "--quiet", "refs/remotes/origin/HEAD"], check=False
)
if result.returncode != 0:
return None
ref = result.stdout.strip()
return ref or None
def _extract_branch_name(ref: str | None) -> str | None:
if not ref:
return None
value = ref.strip()
if not value:
return None
return value.split("/")[-1]
def _extract_github_base_sha(env: dict[str, str]) -> str | None:
event_path = env.get("GITHUB_EVENT_PATH", "").strip()
if not event_path:
return None
path = Path(event_path)
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
base_sha = payload.get("pull_request", {}).get("base", {}).get("sha")
if isinstance(base_sha, str) and base_sha.strip():
return base_sha.strip()
return None
def _resolve_default_branch_name(repo_path: Path, env: dict[str, str]) -> str | None:
github_base_ref = env.get("GITHUB_BASE_REF", "").strip()
if github_base_ref:
return github_base_ref
origin_head = _resolve_origin_head_ref(repo_path)
if origin_head:
branch = _extract_branch_name(origin_head)
if branch:
return branch
if _git_ref_exists(repo_path, "refs/remotes/origin/main"):
return "main"
if _git_ref_exists(repo_path, "refs/remotes/origin/master"):
return "master"
return None
def _resolve_base_ref(repo_path: Path, diff_base: str | None, env: dict[str, str]) -> str:
if diff_base and diff_base.strip():
return diff_base.strip()
github_base_ref = env.get("GITHUB_BASE_REF", "").strip()
if github_base_ref:
github_candidate = f"refs/remotes/origin/{github_base_ref}"
if _git_ref_exists(repo_path, github_candidate):
return github_candidate
github_base_sha = _extract_github_base_sha(env)
if github_base_sha and _git_ref_exists(repo_path, github_base_sha):
return github_base_sha
origin_head = _resolve_origin_head_ref(repo_path)
if origin_head and _git_ref_exists(repo_path, origin_head):
return origin_head
if _git_ref_exists(repo_path, "refs/remotes/origin/main"):
return "refs/remotes/origin/main"
if _git_ref_exists(repo_path, "refs/remotes/origin/master"):
return "refs/remotes/origin/master"
raise ValueError(
"Unable to resolve a base ref for diff-scope. Pass --diff-base explicitly "
"(for example: --diff-base origin/main)."
)
def _get_current_branch_name(repo_path: Path) -> str | None:
result = _run_git_command(repo_path, ["rev-parse", "--abbrev-ref", "HEAD"], check=False)
if result.returncode != 0:
return None
branch_name = result.stdout.strip()
if not branch_name or branch_name == "HEAD":
return None
return branch_name
def _parse_name_status_z(raw_output: bytes) -> list[DiffEntry]:
if not raw_output:
return []
tokens = [
token.decode("utf-8", errors="replace") for token in raw_output.split(b"\x00") if token
]
entries: list[DiffEntry] = []
index = 0
while index < len(tokens):
token = tokens[index]
status_raw = token
status_code = status_raw[:1]
similarity: int | None = None
if len(status_raw) > 1 and status_raw[1:].isdigit():
similarity = int(status_raw[1:])
# Git's -z output for --name-status is:
# - non-rename/copy: <status>\0<path>\0
# - rename/copy: <statusN>\0<old_path>\0<new_path>\0
if status_code in {"R", "C"} and index + 2 < len(tokens):
old_path = tokens[index + 1]
new_path = tokens[index + 2]
entries.append(
DiffEntry(
status=status_code,
path=new_path,
old_path=old_path,
similarity=similarity,
)
)
index += 3
continue
if index + 1 < len(tokens):
path = tokens[index + 1]
entries.append(DiffEntry(status=status_code, path=path, similarity=similarity))
index += 2
continue
# Backward-compat fallback if output is tab-delimited unexpectedly.
status_fallback, has_tab, first_path = token.partition("\t")
if not has_tab:
break
fallback_code = status_fallback[:1]
fallback_similarity: int | None = None
if len(status_fallback) > 1 and status_fallback[1:].isdigit():
fallback_similarity = int(status_fallback[1:])
entries.append(
DiffEntry(status=fallback_code, path=first_path, similarity=fallback_similarity)
)
index += 1
return entries
def _append_unique(container: list[str], seen: set[str], path: str) -> None:
if path and path not in seen:
seen.add(path)
container.append(path)
def _classify_diff_entries(entries: list[DiffEntry]) -> dict[str, Any]:
added_files: list[str] = []
modified_files: list[str] = []
deleted_files: list[str] = []
renamed_files: list[dict[str, Any]] = []
analyzable_files: list[str] = []
analyzable_seen: set[str] = set()
modified_seen: set[str] = set()
for entry in entries:
path = entry.path
if not path:
continue
if entry.status == "D":
deleted_files.append(path)
continue
if entry.status == "A":
added_files.append(path)
_append_unique(analyzable_files, analyzable_seen, path)
continue
if entry.status == "M":
_append_unique(modified_files, modified_seen, path)
_append_unique(analyzable_files, analyzable_seen, path)
continue
if entry.status == "R":
renamed_files.append(
{
"old_path": entry.old_path,
"new_path": path,
"similarity": entry.similarity,
}
)
_append_unique(analyzable_files, analyzable_seen, path)
if entry.similarity is None or entry.similarity < 100:
_append_unique(modified_files, modified_seen, path)
continue
if entry.status == "C":
_append_unique(modified_files, modified_seen, path)
_append_unique(analyzable_files, analyzable_seen, path)
continue
_append_unique(modified_files, modified_seen, path)
_append_unique(analyzable_files, analyzable_seen, path)
return {
"added_files": added_files,
"modified_files": modified_files,
"deleted_files": deleted_files,
"renamed_files": renamed_files,
"analyzable_files": analyzable_files,
}
def _truncate_file_list(
files: list[str], max_files: int = _MAX_FILES_PER_SECTION
) -> tuple[list[str], bool]:
if len(files) <= max_files:
return files, False
return files[:max_files], True
def build_diff_scope_instruction(scopes: list[RepoDiffScope]) -> str: # noqa: PLR0912
lines = [
"The user is requesting a review of a Pull Request.",
"Instruction: Direct your analysis primarily at the changes in the listed files. "
"You may reference other files in the repository for context (imports, definitions, "
"usage), but report findings only if they relate to the listed changes.",
"For Added files, review the entire file content.",
"For Modified files, focus primarily on the changed areas.",
]
for scope in scopes:
repo_name = scope.workspace_subdir or Path(scope.source_path).name or "repository"
lines.append("")
lines.append(f"Repository Scope: {repo_name}")
lines.append(f"Base reference: {scope.base_ref}")
lines.append(f"Merge base: {scope.merge_base}")
focus_files, focus_truncated = _truncate_file_list(scope.analyzable_files)
scope.truncated_sections["analyzable_files"] = focus_truncated
if focus_files:
lines.append("Primary Focus (changed files to analyze):")
lines.extend(f"- {path}" for path in focus_files)
if focus_truncated:
lines.append(f"- ... ({len(scope.analyzable_files) - len(focus_files)} more files)")
else:
lines.append("Primary Focus: No analyzable changed files detected.")
added_files, added_truncated = _truncate_file_list(scope.added_files)
scope.truncated_sections["added_files"] = added_truncated
if added_files:
lines.append("Added files (review entire file):")
lines.extend(f"- {path}" for path in added_files)
if added_truncated:
lines.append(f"- ... ({len(scope.added_files) - len(added_files)} more files)")
modified_files, modified_truncated = _truncate_file_list(scope.modified_files)
scope.truncated_sections["modified_files"] = modified_truncated
if modified_files:
lines.append("Modified files (focus on changes):")
lines.extend(f"- {path}" for path in modified_files)
if modified_truncated:
lines.append(
f"- ... ({len(scope.modified_files) - len(modified_files)} more files)"
)
if scope.renamed_files:
rename_lines = []
for rename in scope.renamed_files:
old_path = rename.get("old_path") or "unknown"
new_path = rename.get("new_path") or "unknown"
similarity = rename.get("similarity")
if isinstance(similarity, int):
rename_lines.append(f"- {old_path} -> {new_path} (similarity {similarity}%)")
else:
rename_lines.append(f"- {old_path} -> {new_path}")
lines.append("Renamed files:")
lines.extend(rename_lines)
deleted_files, deleted_truncated = _truncate_file_list(scope.deleted_files)
scope.truncated_sections["deleted_files"] = deleted_truncated
if deleted_files:
lines.append("Note: These files were deleted (context only, not analyzable):")
lines.extend(f"- {path}" for path in deleted_files)
if deleted_truncated:
lines.append(f"- ... ({len(scope.deleted_files) - len(deleted_files)} more files)")
return "\n".join(lines).strip()
def _should_activate_auto_scope(
local_sources: list[dict[str, str]], non_interactive: bool, env: dict[str, str]
) -> bool:
if not local_sources:
return False
if not non_interactive:
return False
if not _is_ci_environment(env):
return False
if _is_pr_environment(env):
return True
for source in local_sources:
source_path = source.get("source_path")
if not source_path:
continue
repo_path = Path(source_path)
if not _is_git_repo(repo_path):
continue
current_branch = _get_current_branch_name(repo_path)
default_branch = _resolve_default_branch_name(repo_path, env)
if current_branch and default_branch and current_branch != default_branch:
return True
return False
def _resolve_repo_diff_scope(
source: dict[str, str], diff_base: str | None, env: dict[str, str]
) -> RepoDiffScope:
source_path = source.get("source_path", "")
workspace_subdir = source.get("workspace_subdir")
repo_path = Path(source_path)
if not _is_git_repo(repo_path):
raise ValueError(f"Source is not a git repository: {source_path}")
if _is_repo_shallow(repo_path):
raise ValueError(
"Strix requires full git history for diff-scope. Please set fetch-depth: 0 "
"in your CI config."
)
base_ref = _resolve_base_ref(repo_path, diff_base, env)
merge_base_result = _run_git_command(repo_path, ["merge-base", base_ref, "HEAD"], check=False)
if merge_base_result.returncode != 0:
stderr = merge_base_result.stderr.strip()
raise ValueError(
f"Unable to compute merge-base against '{base_ref}' for '{source_path}'. "
f"{stderr or 'Ensure the base branch history is fetched and reachable.'}"
)
merge_base = merge_base_result.stdout.strip()
if not merge_base:
raise ValueError(
f"Unable to compute merge-base against '{base_ref}' for '{source_path}'. "
"Ensure the base branch history is fetched and reachable."
)
diff_result = _run_git_command_raw(
repo_path,
[
"diff",
"--name-status",
"-z",
"--find-renames",
"--find-copies",
f"{merge_base}...HEAD",
],
check=False,
)
if diff_result.returncode != 0:
stderr = diff_result.stderr.decode("utf-8", errors="replace").strip()
raise ValueError(
f"Unable to resolve changed files for '{source_path}'. "
f"{stderr or 'Ensure the repository has enough history for diff-scope.'}"
)
entries = _parse_name_status_z(diff_result.stdout)
classified = _classify_diff_entries(entries)
return RepoDiffScope(
source_path=source_path,
workspace_subdir=workspace_subdir,
base_ref=base_ref,
merge_base=merge_base,
added_files=classified["added_files"],
modified_files=classified["modified_files"],
renamed_files=classified["renamed_files"],
deleted_files=classified["deleted_files"],
analyzable_files=classified["analyzable_files"],
)
def resolve_diff_scope_context(
local_sources: list[dict[str, str]],
scope_mode: str,
diff_base: str | None,
non_interactive: bool,
env: dict[str, str] | None = None,
) -> DiffScopeResult:
if scope_mode not in _SUPPORTED_SCOPE_MODES:
raise ValueError(f"Unsupported scope mode: {scope_mode}")
env_map = dict(os.environ if env is None else env)
if scope_mode == "full":
return DiffScopeResult(
active=False,
mode=scope_mode,
metadata={"active": False, "mode": scope_mode},
)
if scope_mode == "auto":
should_activate = _should_activate_auto_scope(local_sources, non_interactive, env_map)
if not should_activate:
return DiffScopeResult(
active=False,
mode=scope_mode,
metadata={"active": False, "mode": scope_mode},
)
if not local_sources:
raise ValueError("Diff-scope is active, but no local repository targets were provided.")
repo_scopes: list[RepoDiffScope] = []
skipped_non_git: list[str] = []
skipped_diff_scope: list[str] = []
for source in local_sources:
source_path = source.get("source_path")
if not source_path:
continue
if not _is_git_repo(Path(source_path)):
skipped_non_git.append(source_path)
continue
try:
repo_scopes.append(_resolve_repo_diff_scope(source, diff_base, env_map))
except ValueError as e:
if scope_mode == "auto":
skipped_diff_scope.append(f"{source_path} (diff-scope skipped: {e})")
continue
raise
if not repo_scopes:
if scope_mode == "auto":
metadata: dict[str, Any] = {"active": False, "mode": scope_mode}
if skipped_non_git:
metadata["skipped_non_git_sources"] = skipped_non_git
if skipped_diff_scope:
metadata["skipped_diff_scope_sources"] = skipped_diff_scope
return DiffScopeResult(active=False, mode=scope_mode, metadata=metadata)
raise ValueError(
"Diff-scope is active, but no Git repositories were found. "
"Use --scope-mode full to disable diff-scope for this run."
)
instruction_block = build_diff_scope_instruction(repo_scopes)
metadata: dict[str, Any] = {
"active": True,
"mode": scope_mode,
"repos": [scope.to_metadata() for scope in repo_scopes],
"total_repositories": len(repo_scopes),
"total_analyzable_files": sum(len(scope.analyzable_files) for scope in repo_scopes),
"total_deleted_files": sum(len(scope.deleted_files) for scope in repo_scopes),
}
if skipped_non_git:
metadata["skipped_non_git_sources"] = skipped_non_git
if skipped_diff_scope:
metadata["skipped_diff_scope_sources"] = skipped_diff_scope
return DiffScopeResult(
active=True,
mode=scope_mode,
instruction_block=instruction_block,
metadata=metadata,
)
def _is_http_git_repo(url: str) -> bool:
check_url = f"{url.rstrip('/')}/info/refs?service=git-upload-pack"

View File

@@ -13,6 +13,7 @@ class LLMConfig:
skills: list[str] | None = None,
timeout: int | None = None,
scan_mode: str = "deep",
is_whitebox: bool = False,
interactive: bool = False,
reasoning_effort: str | None = None,
system_prompt_context: dict[str, Any] | None = None,
@@ -33,7 +34,7 @@ class LLMConfig:
self.timeout = timeout or int(Config.get("llm_timeout") or "300")
self.scan_mode = scan_mode if scan_mode in ["quick", "standard", "deep"] else "deep"
self.is_whitebox = is_whitebox
self.interactive = interactive
self.reasoning_effort = reasoning_effort
self.system_prompt_context = system_prompt_context or {}

View File

@@ -111,6 +111,9 @@ class LLM:
def _get_skills_to_load(self) -> list[str]:
ordered_skills = [*self._active_skills]
ordered_skills.append(f"scan_modes/{self.config.scan_mode}")
if self.config.is_whitebox:
ordered_skills.append("coordination/source_aware_whitebox")
ordered_skills.append("custom/source_aware_sast")
deduped: list[str] = []
seen: set[str] = set()

View File

@@ -38,6 +38,10 @@ The skills are dynamically injected into the agent's system prompt, allowing it
| **`/reconnaissance`** | Advanced information gathering and enumeration techniques for comprehensive attack surface mapping |
| **`/custom`** | Community-contributed skills for specialized or industry-specific testing scenarios |
Notable source-aware skills:
- `source_aware_whitebox` (coordination): white-box orchestration playbook
- `source_aware_sast` (custom): semgrep/AST/secrets/supply-chain static triage workflow
---
## 🎨 Creating New Skills

View File

@@ -0,0 +1,70 @@
---
name: source-aware-whitebox
description: Coordination playbook for source-aware white-box testing with static triage and dynamic validation
---
# Source-Aware White-Box Coordination
Use this coordination playbook when repository source code is available.
## Objective
Increase white-box coverage by combining source-aware triage with dynamic validation. Source-aware tooling is expected by default when source is available.
## Recommended Workflow
1. Build a quick source map before deep exploitation, including at least one AST-structural pass (`sg` or `tree-sitter`) scoped to relevant paths.
- For `sg` baseline, derive `sg-targets.txt` from `semgrep.json` scope first (`paths.scanned`, fallback to unique `results[].path`) and run `xargs ... sg run` on that list.
- Only fall back to path heuristics when semgrep scope is unavailable, and record the fallback reason in the repo wiki.
2. Run first-pass static triage to rank high-risk paths.
3. Use triage outputs to prioritize dynamic PoC validation.
4. Keep findings evidence-driven: no report without validation.
5. Keep shared wiki memory current so all agents can reuse context.
## Source-Aware Triage Stack
- `semgrep`: fast security-first triage and custom pattern scans
- `ast-grep` (`sg`): structural pattern hunting and targeted repo mapping
- `tree-sitter`: syntax-aware parsing support for symbol and route extraction
- `gitleaks` + `trufflehog`: complementary secret detection (working tree and history coverage)
- `trivy fs`: dependency, misconfiguration, license, and secret checks
Coverage target per repository:
- one `semgrep` pass
- one AST structural pass (`sg` and/or `tree-sitter`)
- one secrets pass (`gitleaks` and/or `trufflehog`)
- one `trivy fs` pass
- if any part is skipped, log the reason in the shared wiki note
## Agent Delegation Guidance
- Keep child agents specialized by vulnerability/component as usual.
- For source-heavy subtasks, prefer creating child agents with `source_aware_sast` skill.
- Use source findings to shape payloads and endpoint selection for dynamic testing.
## Wiki Note Requirement (Source Map)
When source is present, maintain two stable wiki notes per repository and keep them current:
- `wiki:overview` for architecture/source-map context
- `wiki:security` for scanner and validation deltas
Operational rules:
- At task start, call `list_notes` with `category=wiki`; read `wiki:overview` first, then `wiki:security` via `get_note(note_id=...)`.
- If wiki notes are missing, create them with `create_note`, `category=wiki`, and tags including `wiki:overview` or `wiki:security`.
- Update existing notes via `update_note`; avoid creating duplicates.
- Child agents should read both notes first, then extend with new evidence from their scope.
- Before calling `agent_finish`, each source-focused child agent should append a short delta update to `wiki:security` (scanner outputs, route/sink map deltas, dynamic follow-ups).
Recommended sections:
- Architecture overview
- Entrypoints and routing
- AuthN/AuthZ model
- High-risk sinks and trust boundaries
- Static scanner summary
- Dynamic validation follow-ups
## Validation Guardrails
- Static findings are hypotheses until validated.
- Dynamic exploitation evidence is still required before vulnerability reporting.
- Keep scanner output concise, deduplicated, and mapped to concrete code locations.

View File

@@ -0,0 +1,167 @@
---
name: source-aware-sast
description: Practical source-aware SAST and AST playbook for semgrep, ast-grep, gitleaks, and trivy fs
---
# Source-Aware SAST Playbook
Use this skill for source-heavy analysis where static and structural signals should guide dynamic testing.
## Fast Start
Run tools from repo root and store outputs in a dedicated artifact directory:
```bash
mkdir -p /workspace/.strix-source-aware
```
Before scanning, check shared wiki memory:
```text
1) list_notes(category="wiki")
2) get_note(note_id=...) for `wiki:overview` first, then `wiki:security`
3) Reuse matching repo wiki notes if present
4) create_note(category="wiki") only if missing (with tags `wiki:overview` / `wiki:security`)
```
After every major source-analysis batch, update `wiki:security` with `update_note` so other agents can reuse your latest map.
## Baseline Coverage Bundle (Recommended)
Run this baseline once per repository before deep narrowing:
```bash
ART=/workspace/.strix-source-aware
mkdir -p "$ART"
semgrep scan --config p/default --config p/golang --config p/secrets \
--metrics=off --json --output "$ART/semgrep.json" .
# Build deterministic AST targets from semgrep scope (no hardcoded path guessing)
python3 - <<'PY'
import json
from pathlib import Path
art = Path("/workspace/.strix-source-aware")
semgrep_json = art / "semgrep.json"
targets_file = art / "sg-targets.txt"
try:
data = json.loads(semgrep_json.read_text(encoding="utf-8"))
except Exception:
targets_file.write_text("", encoding="utf-8")
raise
scanned = data.get("paths", {}).get("scanned") or []
if not scanned:
scanned = sorted(
{
r.get("path")
for r in data.get("results", [])
if isinstance(r, dict) and isinstance(r.get("path"), str) and r.get("path")
}
)
bounded = scanned[:4000]
targets_file.write_text("".join(f"{p}\n" for p in bounded), encoding="utf-8")
print(f"sg-targets: {len(bounded)}")
PY
xargs -r -n 200 sg run --pattern '$F($$$ARGS)' --json=stream < "$ART/sg-targets.txt" \
> "$ART/ast-grep.json" 2> "$ART/ast-grep.log" || true
gitleaks detect --source . --report-format json --report-path "$ART/gitleaks.json" || true
trufflehog filesystem --no-update --json --no-verification . > "$ART/trufflehog.json" || true
# Keep trivy focused on vuln/misconfig (secrets already covered above) and increase timeout for large repos
trivy fs --scanners vuln,misconfig --timeout 30m --offline-scan \
--format json --output "$ART/trivy-fs.json" . || true
```
If one tool is skipped or fails, record that in `wiki:security` along with the reason.
## Semgrep First Pass
Use Semgrep as the default static triage pass:
```bash
# Preferred deterministic profile set (works with --metrics=off)
semgrep scan --config p/default --config p/golang --config p/secrets \
--metrics=off --json --output /workspace/.strix-source-aware/semgrep.json .
# If you choose auto config, do not combine it with --metrics=off
semgrep scan --config auto --json --output /workspace/.strix-source-aware/semgrep-auto.json .
```
If diff scope is active, restrict to changed files first, then expand only when needed.
## AST-Grep Structural Mapping
Use `sg` for structure-aware code hunting:
```bash
# Ruleless structural pass over deterministic target list (no sgconfig.yml required)
xargs -r -n 200 sg run --pattern '$F($$$ARGS)' --json=stream \
< /workspace/.strix-source-aware/sg-targets.txt \
> /workspace/.strix-source-aware/ast-grep.json 2> /workspace/.strix-source-aware/ast-grep.log || true
```
Target high-value patterns such as:
- missing auth checks near route handlers
- dynamic command/query construction
- unsafe deserialization or template execution paths
- file and path operations influenced by user input
## Tree-Sitter Assisted Repo Mapping
Use tree-sitter CLI for syntax-aware parsing when grep-level mapping is noisy:
```bash
tree-sitter parse -q <file>
```
Use outputs to improve route/symbol/sink maps for subsequent targeted scans.
## Secret and Supply Chain Coverage
Detect hardcoded credentials:
```bash
gitleaks detect --source . --report-format json --report-path /workspace/.strix-source-aware/gitleaks.json
trufflehog filesystem --json . > /workspace/.strix-source-aware/trufflehog.json
```
Run repository-wide dependency and config checks:
```bash
trivy fs --scanners vuln,misconfig --timeout 30m --offline-scan \
--format json --output /workspace/.strix-source-aware/trivy-fs.json . || true
```
## Converting Static Signals Into Exploits
1. Rank candidates by impact and exploitability.
2. Trace source-to-sink flow for top candidates.
3. Build dynamic PoCs that reproduce the suspected issue.
4. Report only after dynamic validation succeeds.
## Wiki Update Template
Keep `wiki:overview` and `wiki:security` per repository. Update these sections in `wiki:security`:
```text
## Architecture
## Entrypoints
## AuthN/AuthZ
## High-Risk Sinks
## Static Findings Summary
## Dynamic Validation Follow-Ups
```
Before `agent_finish`, make one final `update_note` call to capture:
- scanner artifacts and paths
- top validated/invalidated hypotheses
- concrete dynamic follow-up tasks
## Anti-Patterns
- Do not treat scanner output as final truth.
- Do not spend full cycles on low-signal pattern matches.
- Do not report source-only findings without validation evidence.
- Do not create duplicate `wiki:overview` or `wiki:security` notes for the same repository.

View File

@@ -15,6 +15,11 @@ Thorough understanding before exploitation. Test every parameter, every endpoint
**Whitebox (source available)**
- Map every file, module, and code path in the repository
- Load and maintain shared `wiki` notes from the start (`list_notes(category="wiki")`, then `get_note(note_id=...)` for `wiki:overview` and `wiki:security`), then continuously update `wiki:security`
- Start with broad source-aware triage (`semgrep`, `ast-grep`, `gitleaks`, `trufflehog`, `trivy fs`) and use outputs to drive deep review
- Execute at least one structural AST pass (`sg` and/or Tree-sitter) per repository and store artifacts for reuse
- Keep AST artifacts bounded and query-driven (target relevant paths/sinks first; avoid whole-repo generic function dumps)
- Use syntax-aware parsing (Tree-sitter tooling) to improve symbol, route, and sink extraction quality
- Trace all entry points from HTTP handlers to database queries
- Document all authentication mechanisms and implementations
- Map authorization checks and access control model
@@ -25,7 +30,8 @@ Thorough understanding before exploitation. Test every parameter, every endpoint
- Identify all serialization/deserialization points
- Review file handling: upload, download, processing
- Understand the deployment model and infrastructure assumptions
- Check all dependency versions against CVE databases
- Check all dependency versions and repository risks against CVE/misconfiguration data
- Before final completion, update `wiki:security` with scanner summary + dynamic follow-ups
**Blackbox (no source)**
- Exhaustive subdomain enumeration with multiple sources and tools

View File

@@ -15,9 +15,15 @@ Optimize for fast feedback on critical security issues. Skip exhaustive enumerat
**Whitebox (source available)**
- Focus on recent changes: git diffs, new commits, modified files—these are most likely to contain fresh bugs
- Read existing `wiki` notes first (`list_notes(category="wiki")`, then `get_note(note_id=...)` for `wiki:overview` and `wiki:security`) to avoid remapping from scratch
- Run a fast static triage on changed files first (`semgrep`, then targeted `sg` queries)
- Run at least one lightweight AST pass (`sg` or Tree-sitter) so structural mapping is not skipped
- Keep AST commands tightly scoped to changed or high-risk paths; avoid broad repository-wide pattern dumps
- Run quick secret and dependency checks (`gitleaks`, `trufflehog`, `trivy fs`) scoped to changed areas when possible
- Identify security-sensitive patterns in changed code: auth checks, input handling, database queries, file operations
- Trace user input through modified code paths
- Check if security controls were modified or bypassed
- Before completion, update `wiki:security` with what changed and what needs dynamic follow-up
**Blackbox (no source)**
- Map authentication and critical user flows

View File

@@ -15,12 +15,17 @@ Systematic testing across the full attack surface. Understand the application be
**Whitebox (source available)**
- Map codebase structure: modules, entry points, routing
- Start by loading existing `wiki` notes (`list_notes(category="wiki")`, then `get_note(note_id=...)` for `wiki:overview` and `wiki:security`) and update `wiki:security` as mapping evolves
- Run `semgrep` first-pass triage to prioritize risky flows before deep manual review
- Run at least one AST-structural mapping pass (`sg` and/or Tree-sitter), then use outputs for route, sink, and trust-boundary mapping
- Keep AST output bounded to relevant paths and hypotheses; avoid whole-repo generic function dumps
- Identify architecture pattern (MVC, microservices, monolith)
- Trace input vectors: forms, APIs, file uploads, headers, cookies
- Review authentication and authorization flows
- Analyze database interactions and ORM usage
- Check dependencies for known CVEs
- Check dependencies and repo risks with `trivy fs`, `gitleaks`, and `trufflehog`
- Understand the data model and sensitive data locations
- Before completion, update `wiki:security` with source findings summary and dynamic validation next steps
**Blackbox (no source)**
- Crawl application thoroughly, interact with every feature

View File

@@ -1,5 +1,6 @@
import threading
from datetime import UTC, datetime
import re
from typing import Any, Literal
from strix.tools.registry import register_tool
@@ -21,6 +22,200 @@ _agent_instances: dict[str, Any] = {}
_agent_states: dict[str, Any] = {}
def _is_whitebox_agent(agent_id: str) -> bool:
agent = _agent_instances.get(agent_id)
return bool(getattr(getattr(agent, "llm_config", None), "is_whitebox", False))
def _extract_repo_tags(agent_state: Any | None) -> set[str]:
repo_tags: set[str] = set()
if agent_state is None:
return repo_tags
task_text = str(getattr(agent_state, "task", "") or "")
for workspace_subdir in re.findall(r"/workspace/([A-Za-z0-9._-]+)", task_text):
repo_tags.add(f"repo:{workspace_subdir.lower()}")
for repo_name in re.findall(r"github\.com/[^/\s]+/([A-Za-z0-9._-]+)", task_text):
normalized = repo_name.removesuffix(".git").lower()
if normalized:
repo_tags.add(f"repo:{normalized}")
return repo_tags
def _extract_wiki_kind(note: dict[str, Any]) -> str:
note_kind = str(note.get("wiki_kind") or "").strip().lower()
if note_kind in {"overview", "security", "general"}:
return note_kind
note_tags = note.get("tags") or []
if isinstance(note_tags, list):
normalized_tags = {str(tag).strip().lower() for tag in note_tags if str(tag).strip()}
if "wiki:overview" in normalized_tags:
return "overview"
if "wiki:security" in normalized_tags:
return "security"
title = str(note.get("title") or "").lower()
if "overview" in title or "architecture" in title:
return "overview"
if "security" in title or "vuln" in title or "finding" in title:
return "security"
return "general"
def _load_primary_wiki_note(
agent_state: Any | None = None,
preferred_kind: str | None = None,
allow_kind_fallback: bool = True,
) -> dict[str, Any] | None:
try:
from strix.tools.notes.notes_actions import get_note, list_notes
notes_result = list_notes(category="wiki")
if not notes_result.get("success"):
return None
notes = notes_result.get("notes") or []
if not notes:
return None
candidate_notes = notes
selected_note_id = None
repo_tags = _extract_repo_tags(agent_state)
if repo_tags:
tagged_notes = []
for note in notes:
note_tags = note.get("tags") or []
if not isinstance(note_tags, list):
continue
normalized_note_tags = {str(tag).strip().lower() for tag in note_tags if str(tag).strip()}
if normalized_note_tags.intersection(repo_tags):
tagged_notes.append(note)
if tagged_notes:
candidate_notes = tagged_notes
normalized_kind = (preferred_kind or "").strip().lower()
if normalized_kind in {"overview", "security", "general"}:
for note in candidate_notes:
if _extract_wiki_kind(note) == normalized_kind:
selected_note_id = note.get("note_id")
break
if not selected_note_id and (not normalized_kind or allow_kind_fallback):
selected_note_id = candidate_notes[0].get("note_id")
note_id = selected_note_id
if not isinstance(note_id, str) or not note_id:
return None
note_result = get_note(note_id=note_id)
if not note_result.get("success"):
return None
note = note_result.get("note")
if not isinstance(note, dict):
return None
except Exception:
return None
else:
return note
def _inject_wiki_context_for_whitebox(agent_state: Any) -> None:
if not _is_whitebox_agent(agent_state.agent_id):
return
overview_note = _load_primary_wiki_note(
agent_state,
preferred_kind="overview",
allow_kind_fallback=False,
)
security_note = _load_primary_wiki_note(
agent_state,
preferred_kind="security",
allow_kind_fallback=True,
)
notes_to_embed: list[tuple[str, dict[str, Any]]] = []
if isinstance(overview_note, dict):
notes_to_embed.append(("overview", overview_note))
if isinstance(security_note, dict):
overview_note_id = str(overview_note.get("note_id")) if isinstance(overview_note, dict) else ""
security_note_id = str(security_note.get("note_id"))
if not overview_note_id or overview_note_id != security_note_id:
notes_to_embed.append(("security", security_note))
max_chars = 4000
for wiki_kind, note in notes_to_embed:
title = str(note.get("title") or "repo wiki")
content = str(note.get("content") or "").strip()
if not content:
continue
truncated_content = content[:max_chars]
suffix = "\n\n[truncated for context size]" if len(content) > max_chars else ""
agent_state.add_message(
"user",
(
f"<shared_repo_wiki type=\"{wiki_kind}\" title=\"{title}\">\n"
f"{truncated_content}{suffix}\n"
"</shared_repo_wiki>"
),
)
def _append_wiki_update_on_finish(
agent_state: Any,
agent_name: str,
result_summary: str,
findings: list[str] | None,
final_recommendations: list[str] | None,
) -> None:
if not _is_whitebox_agent(agent_state.agent_id):
return
try:
from strix.tools.notes.notes_actions import append_note_content
note = _load_primary_wiki_note(
agent_state,
preferred_kind="security",
allow_kind_fallback=True,
)
if not note:
return
note_id = note.get("note_id")
if not isinstance(note_id, str) or not note_id:
return
timestamp = datetime.now(UTC).isoformat()
summary = " ".join(str(result_summary).split())
if len(summary) > 1200:
summary = f"{summary[:1197]}..."
findings_lines = "\n".join(f"- {item}" for item in (findings or [])) or "- none"
recommendation_lines = (
"\n".join(f"- {item}" for item in (final_recommendations or [])) or "- none"
)
delta = (
f"\n\n## Agent Update: {agent_name} ({timestamp})\n"
f"Summary: {summary}\n\n"
"Findings:\n"
f"{findings_lines}\n\n"
"Recommendations:\n"
f"{recommendation_lines}\n"
)
append_note_content(note_id=note_id, delta=delta)
except Exception:
# Best-effort update; never block agent completion on note persistence.
return
def _run_agent_in_thread(
agent: Any, state: Any, inherited_messages: list[dict[str, Any]]
) -> dict[str, Any]:
@@ -31,6 +226,8 @@ def _run_agent_in_thread(
state.add_message(msg["role"], msg["content"])
state.add_message("user", "</inherited_context_from_parent>")
_inject_wiki_context_for_whitebox(state)
parent_info = _agent_graph["nodes"].get(state.parent_id, {})
parent_name = parent_info.get("name", "Unknown Parent")
@@ -39,6 +236,18 @@ def _run_agent_in_thread(
if inherited_messages
else "started with a fresh context"
)
wiki_memory_instruction = ""
if getattr(getattr(agent, "llm_config", None), "is_whitebox", False):
wiki_memory_instruction = (
'\n - White-box memory (recommended): call list_notes(category="wiki"), read '
"wiki:overview first when available, then wiki:security via get_note(note_id=...) before substantive work (including terminal scans)"
"\n - Prefer two stable wiki notes per repo: one tagged wiki:overview and one tagged wiki:security; avoid duplicates"
"\n - Before agent_finish, call list_notes(category=\"wiki\") + get_note(note_id=...) again, then append a short scope delta via update_note to wiki:security (new routes/sinks, scanner results, dynamic follow-ups)"
"\n - If terminal output contains `command not found` or shell parse errors, correct and rerun before using the result"
"\n - Use ASCII-only shell commands; if a command includes unexpected non-ASCII characters, rerun with a clean ASCII command"
"\n - Keep AST artifacts bounded: target relevant paths and avoid whole-repo generic function dumps"
"\n - Source-aware tooling is advisory: choose semgrep/AST/tree-sitter/gitleaks/trivy when relevant, do not force static steps for purely dynamic validation tasks"
)
task_xml = f"""<agent_delegation>
<identity>
@@ -64,6 +273,7 @@ def _run_agent_in_thread(
- All agents share /workspace directory and proxy history for better collaboration
- You can see files created by other agents and proxy traffic from previous work
- Build upon previous work but focus on your specific delegated task
{wiki_memory_instruction}
</instructions>
</agent_delegation>"""
@@ -214,14 +424,34 @@ def create_agent(
timeout = None
scan_mode = "deep"
is_whitebox = False
interactive = False
if parent_agent and hasattr(parent_agent, "llm_config"):
if hasattr(parent_agent.llm_config, "timeout"):
timeout = parent_agent.llm_config.timeout
if hasattr(parent_agent.llm_config, "scan_mode"):
scan_mode = parent_agent.llm_config.scan_mode
if hasattr(parent_agent.llm_config, "is_whitebox"):
is_whitebox = parent_agent.llm_config.is_whitebox
interactive = getattr(parent_agent.llm_config, "interactive", False)
if is_whitebox:
whitebox_guidance = (
"\n\nWhite-box execution guidance (recommended when source is available):\n"
"- Use structural AST mapping (`sg` or `tree-sitter`) where it helps source analysis; "
"keep artifacts bounded and skip forced AST steps for purely dynamic validation tasks.\n"
"- Keep AST output bounded: scope to relevant paths/files, avoid whole-repo "
"generic function patterns, and cap artifact size.\n"
'- Use shared wiki memory by calling list_notes(category="wiki"), reading wiki:overview first '
"then wiki:security via get_note(note_id=...).\n"
'- Before agent_finish, call list_notes(category="wiki") + get_note(note_id=...) '
"again, and append updates to wiki:security.\n"
"- If terminal output contains `command not found` or shell parse errors, "
"correct and rerun before using the result."
)
if "White-box execution guidance (recommended when source is available):" not in task:
task = f"{task.rstrip()}{whitebox_guidance}"
state = AgentState(
task=task,
agent_name=name,
@@ -229,11 +459,11 @@ def create_agent(
max_iterations=300,
waiting_timeout=300 if interactive else 600,
)
llm_config = LLMConfig(
skills=skill_list,
timeout=timeout,
scan_mode=scan_mode,
is_whitebox=is_whitebox,
interactive=interactive,
)
@@ -382,6 +612,14 @@ def agent_finish(
"recommendations": final_recommendations or [],
}
_append_wiki_update_on_finish(
agent_state=agent_state,
agent_name=agent_node["name"],
result_summary=result_summary,
findings=findings,
final_recommendations=final_recommendations,
)
parent_notified = False
if report_to_parent and agent_node["parent_id"]:

View File

@@ -1,6 +1,7 @@
from .notes_actions import (
create_note,
delete_note,
get_note,
list_notes,
update_note,
)
@@ -9,6 +10,7 @@ from .notes_actions import (
__all__ = [
"create_note",
"delete_note",
"get_note",
"list_notes",
"update_note",
]

View File

@@ -1,11 +1,237 @@
import json
import threading
import uuid
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from strix.tools.registry import register_tool
_notes_storage: dict[str, dict[str, Any]] = {}
_VALID_NOTE_CATEGORIES = ["general", "findings", "methodology", "questions", "plan", "wiki"]
_notes_lock = threading.RLock()
_loaded_notes_run_dir: str | None = None
_DEFAULT_CONTENT_PREVIEW_CHARS = 280
def _note_tag_set(note: dict[str, Any]) -> set[str]:
tags = note.get("tags", [])
if not isinstance(tags, list):
return set()
return {str(tag).strip().lower() for tag in tags if str(tag).strip()}
def _infer_wiki_kind(note: dict[str, Any]) -> str:
tag_set = _note_tag_set(note)
if "wiki:overview" in tag_set:
return "overview"
if "wiki:security" in tag_set:
return "security"
title = str(note.get("title", "")).lower()
if "overview" in title or "architecture" in title:
return "overview"
if "security" in title or "vuln" in title or "finding" in title:
return "security"
return "general"
def _get_run_dir() -> Path | None:
try:
from strix.telemetry.tracer import get_global_tracer
tracer = get_global_tracer()
if not tracer:
return None
return tracer.get_run_dir()
except (ImportError, OSError, RuntimeError):
return None
def _get_notes_jsonl_path() -> Path | None:
run_dir = _get_run_dir()
if not run_dir:
return None
notes_dir = run_dir / "notes"
notes_dir.mkdir(parents=True, exist_ok=True)
return notes_dir / "notes.jsonl"
def _append_note_event(op: str, note_id: str, note: dict[str, Any] | None = None) -> None:
notes_path = _get_notes_jsonl_path()
if not notes_path:
return
event: dict[str, Any] = {
"timestamp": datetime.now(UTC).isoformat(),
"op": op,
"note_id": note_id,
}
if note is not None:
event["note"] = note
with notes_path.open("a", encoding="utf-8") as f:
f.write(f"{json.dumps(event, ensure_ascii=True)}\n")
def _load_notes_from_jsonl(notes_path: Path) -> dict[str, dict[str, Any]]:
hydrated: dict[str, dict[str, Any]] = {}
if not notes_path.exists():
return hydrated
with notes_path.open(encoding="utf-8") as f:
for raw_line in f:
line = raw_line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
op = str(event.get("op", "")).strip().lower()
note_id = str(event.get("note_id", "")).strip()
if not note_id or op not in {"create", "update", "delete"}:
continue
if op == "delete":
hydrated.pop(note_id, None)
continue
note = event.get("note")
if not isinstance(note, dict):
continue
existing = hydrated.get(note_id, {})
existing.update(note)
hydrated[note_id] = existing
return hydrated
def _ensure_notes_loaded() -> None:
global _loaded_notes_run_dir # noqa: PLW0603
run_dir = _get_run_dir()
run_dir_key = str(run_dir.resolve()) if run_dir else "__no_run_dir__"
if _loaded_notes_run_dir == run_dir_key:
return
_notes_storage.clear()
notes_path = _get_notes_jsonl_path()
if notes_path:
_notes_storage.update(_load_notes_from_jsonl(notes_path))
try:
for note_id, note in _notes_storage.items():
if note.get("category") == "wiki":
_persist_wiki_note(note_id, note)
_persist_wiki_index()
except OSError:
pass
_loaded_notes_run_dir = run_dir_key
def _sanitize_wiki_title(title: str) -> str:
cleaned = "".join(ch.lower() if ch.isalnum() else "-" for ch in title.strip())
slug = "-".join(part for part in cleaned.split("-") if part)
return slug or "wiki-note"
def _get_wiki_directory() -> Path | None:
try:
run_dir = _get_run_dir()
if not run_dir:
return None
wiki_dir = run_dir / "wiki"
wiki_dir.mkdir(parents=True, exist_ok=True)
except OSError:
return None
else:
return wiki_dir
def _get_wiki_index_path() -> Path | None:
wiki_dir = _get_wiki_directory()
if not wiki_dir:
return None
return wiki_dir / "index.json"
def _get_wiki_note_path(note_id: str, note: dict[str, Any]) -> Path | None:
wiki_dir = _get_wiki_directory()
if not wiki_dir:
return None
wiki_filename = note.get("wiki_filename")
if not isinstance(wiki_filename, str) or not wiki_filename.strip():
title = note.get("title", "wiki-note")
wiki_filename = f"{note_id}-{_sanitize_wiki_title(str(title))}.md"
note["wiki_filename"] = wiki_filename
return wiki_dir / wiki_filename
def _persist_wiki_note(note_id: str, note: dict[str, Any]) -> None:
wiki_path = _get_wiki_note_path(note_id, note)
if not wiki_path:
return
tags = note.get("tags", [])
tags_line = ", ".join(str(tag) for tag in tags) if isinstance(tags, list) and tags else "none"
content = (
f"# {note.get('title', 'Wiki Note')}\n\n"
f"**Note ID:** {note_id}\n"
f"**Created:** {note.get('created_at', '')}\n"
f"**Updated:** {note.get('updated_at', '')}\n"
f"**Tags:** {tags_line}\n\n"
"## Content\n\n"
f"{note.get('content', '')}\n"
)
wiki_path.write_text(content, encoding="utf-8")
def _persist_wiki_index() -> None:
index_path = _get_wiki_index_path()
if not index_path:
return
notes: list[dict[str, Any]] = []
for note_id, note in _notes_storage.items():
if note.get("category") != "wiki":
continue
wiki_path = _get_wiki_note_path(note_id, note)
notes.append(
{
"note_id": note_id,
"title": str(note.get("title", "")),
"wiki_kind": _infer_wiki_kind(note),
"tags": note.get("tags", []),
"created_at": note.get("created_at", ""),
"updated_at": note.get("updated_at", ""),
"wiki_filename": note.get("wiki_filename", ""),
"wiki_path": wiki_path.name if wiki_path else "",
}
)
notes.sort(key=lambda item: item.get("updated_at", ""), reverse=True)
payload = {"generated_at": datetime.now(UTC).isoformat(), "notes": notes}
index_path.write_text(f"{json.dumps(payload, ensure_ascii=True, indent=2)}\n", encoding="utf-8")
def _remove_wiki_note(note_id: str, note: dict[str, Any]) -> None:
wiki_path = _get_wiki_note_path(note_id, note)
if not wiki_path:
return
if wiki_path.exists():
wiki_path.unlink()
def _filter_notes(
@@ -13,6 +239,7 @@ def _filter_notes(
tags: list[str] | None = None,
search_query: str | None = None,
) -> list[dict[str, Any]]:
_ensure_notes_loaded()
filtered_notes = []
for note_id, note in _notes_storage.items():
@@ -39,50 +266,103 @@ def _filter_notes(
return filtered_notes
def _to_note_listing_entry(
note: dict[str, Any],
*,
include_content: bool = False,
) -> dict[str, Any]:
entry = {
"note_id": note.get("note_id"),
"title": note.get("title", ""),
"category": note.get("category", "general"),
"tags": note.get("tags", []),
"created_at": note.get("created_at", ""),
"updated_at": note.get("updated_at", ""),
}
wiki_filename = note.get("wiki_filename")
if isinstance(wiki_filename, str) and wiki_filename:
entry["wiki_filename"] = wiki_filename
if note.get("category") == "wiki":
entry["wiki_kind"] = _infer_wiki_kind(note)
content = str(note.get("content", ""))
if include_content:
entry["content"] = content
elif content:
if len(content) > _DEFAULT_CONTENT_PREVIEW_CHARS:
entry["content_preview"] = (
f"{content[:_DEFAULT_CONTENT_PREVIEW_CHARS].rstrip()}..."
)
else:
entry["content_preview"] = content
return entry
@register_tool(sandbox_execution=False)
def create_note(
def create_note( # noqa: PLR0911
title: str,
content: str,
category: str = "general",
tags: list[str] | None = None,
) -> dict[str, Any]:
try:
if not title or not title.strip():
return {"success": False, "error": "Title cannot be empty", "note_id": None}
with _notes_lock:
try:
_ensure_notes_loaded()
if not content or not content.strip():
return {"success": False, "error": "Content cannot be empty", "note_id": None}
if not title or not title.strip():
return {"success": False, "error": "Title cannot be empty", "note_id": None}
valid_categories = ["general", "findings", "methodology", "questions", "plan"]
if category not in valid_categories:
return {
"success": False,
"error": f"Invalid category. Must be one of: {', '.join(valid_categories)}",
"note_id": None,
if not content or not content.strip():
return {"success": False, "error": "Content cannot be empty", "note_id": None}
if category not in _VALID_NOTE_CATEGORIES:
return {
"success": False,
"error": (
f"Invalid category. Must be one of: {', '.join(_VALID_NOTE_CATEGORIES)}"
),
"note_id": None,
}
note_id = ""
for _ in range(20):
candidate = str(uuid.uuid4())[:5]
if candidate not in _notes_storage:
note_id = candidate
break
if not note_id:
return {"success": False, "error": "Failed to allocate note ID", "note_id": None}
timestamp = datetime.now(UTC).isoformat()
note = {
"title": title.strip(),
"content": content.strip(),
"category": category,
"tags": tags or [],
"created_at": timestamp,
"updated_at": timestamp,
}
note_id = str(uuid.uuid4())[:5]
timestamp = datetime.now(UTC).isoformat()
_notes_storage[note_id] = note
_append_note_event("create", note_id, note)
if category == "wiki":
_persist_wiki_note(note_id, note)
_persist_wiki_index()
note = {
"title": title.strip(),
"content": content.strip(),
"category": category,
"tags": tags or [],
"created_at": timestamp,
"updated_at": timestamp,
}
_notes_storage[note_id] = note
except (ValueError, TypeError) as e:
return {"success": False, "error": f"Failed to create note: {e}", "note_id": None}
else:
return {
"success": True,
"note_id": note_id,
"message": f"Note '{title}' created successfully",
}
except (ValueError, TypeError) as e:
return {"success": False, "error": f"Failed to create note: {e}", "note_id": None}
except OSError as e:
return {"success": False, "error": f"Failed to persist wiki note: {e}", "note_id": None}
else:
return {
"success": True,
"note_id": note_id,
"message": f"Note '{title}' created successfully",
}
@register_tool(sandbox_execution=False)
@@ -90,23 +370,85 @@ def list_notes(
category: str | None = None,
tags: list[str] | None = None,
search: str | None = None,
include_content: bool = False,
) -> dict[str, Any]:
try:
filtered_notes = _filter_notes(category=category, tags=tags, search_query=search)
with _notes_lock:
try:
filtered_notes = _filter_notes(category=category, tags=tags, search_query=search)
notes = [
_to_note_listing_entry(note, include_content=include_content)
for note in filtered_notes
]
return {
"success": True,
"notes": filtered_notes,
"total_count": len(filtered_notes),
}
return {
"success": True,
"notes": notes,
"total_count": len(notes),
}
except (ValueError, TypeError) as e:
return {
"success": False,
"error": f"Failed to list notes: {e}",
"notes": [],
"total_count": 0,
}
except (ValueError, TypeError) as e:
return {
"success": False,
"error": f"Failed to list notes: {e}",
"notes": [],
"total_count": 0,
}
@register_tool(sandbox_execution=False)
def get_note(note_id: str) -> dict[str, Any]:
with _notes_lock:
try:
_ensure_notes_loaded()
if not note_id or not note_id.strip():
return {
"success": False,
"error": "Note ID cannot be empty",
"note": None,
}
note = _notes_storage.get(note_id)
if note is None:
return {
"success": False,
"error": f"Note with ID '{note_id}' not found",
"note": None,
}
note_with_id = note.copy()
note_with_id["note_id"] = note_id
if note.get("category") == "wiki":
note_with_id["wiki_kind"] = _infer_wiki_kind(note)
except (ValueError, TypeError) as e:
return {
"success": False,
"error": f"Failed to get note: {e}",
"note": None,
}
else:
return {"success": True, "note": note_with_id}
def append_note_content(note_id: str, delta: str) -> dict[str, Any]:
with _notes_lock:
try:
_ensure_notes_loaded()
if note_id not in _notes_storage:
return {"success": False, "error": f"Note with ID '{note_id}' not found"}
if not isinstance(delta, str):
return {"success": False, "error": "Delta must be a string"}
note = _notes_storage[note_id]
existing_content = str(note.get("content") or "")
updated_content = f"{existing_content.rstrip()}{delta}"
return update_note(note_id=note_id, content=updated_content)
except (ValueError, TypeError) as e:
return {"success": False, "error": f"Failed to append note content: {e}"}
@register_tool(sandbox_execution=False)
@@ -116,49 +458,70 @@ def update_note(
content: str | None = None,
tags: list[str] | None = None,
) -> dict[str, Any]:
try:
if note_id not in _notes_storage:
return {"success": False, "error": f"Note with ID '{note_id}' not found"}
with _notes_lock:
try:
_ensure_notes_loaded()
note = _notes_storage[note_id]
if note_id not in _notes_storage:
return {"success": False, "error": f"Note with ID '{note_id}' not found"}
if title is not None:
if not title.strip():
return {"success": False, "error": "Title cannot be empty"}
note["title"] = title.strip()
note = _notes_storage[note_id]
if content is not None:
if not content.strip():
return {"success": False, "error": "Content cannot be empty"}
note["content"] = content.strip()
if title is not None:
if not title.strip():
return {"success": False, "error": "Title cannot be empty"}
note["title"] = title.strip()
if tags is not None:
note["tags"] = tags
if content is not None:
if not content.strip():
return {"success": False, "error": "Content cannot be empty"}
note["content"] = content.strip()
note["updated_at"] = datetime.now(UTC).isoformat()
if tags is not None:
note["tags"] = tags
return {
"success": True,
"message": f"Note '{note['title']}' updated successfully",
}
note["updated_at"] = datetime.now(UTC).isoformat()
_append_note_event("update", note_id, note)
if note.get("category") == "wiki":
_persist_wiki_note(note_id, note)
_persist_wiki_index()
except (ValueError, TypeError) as e:
return {"success": False, "error": f"Failed to update note: {e}"}
return {
"success": True,
"message": f"Note '{note['title']}' updated successfully",
}
except (ValueError, TypeError) as e:
return {"success": False, "error": f"Failed to update note: {e}"}
except OSError as e:
return {"success": False, "error": f"Failed to persist wiki note: {e}"}
@register_tool(sandbox_execution=False)
def delete_note(note_id: str) -> dict[str, Any]:
try:
if note_id not in _notes_storage:
return {"success": False, "error": f"Note with ID '{note_id}' not found"}
with _notes_lock:
try:
_ensure_notes_loaded()
note_title = _notes_storage[note_id]["title"]
del _notes_storage[note_id]
if note_id not in _notes_storage:
return {"success": False, "error": f"Note with ID '{note_id}' not found"}
except (ValueError, TypeError) as e:
return {"success": False, "error": f"Failed to delete note: {e}"}
else:
return {
"success": True,
"message": f"Note '{note_title}' deleted successfully",
}
note = _notes_storage[note_id]
note_title = note["title"]
is_wiki = note.get("category") == "wiki"
if is_wiki:
_remove_wiki_note(note_id, note)
del _notes_storage[note_id]
_append_note_event("delete", note_id)
if is_wiki:
_persist_wiki_index()
except (ValueError, TypeError) as e:
return {"success": False, "error": f"Failed to delete note: {e}"}
except OSError as e:
return {"success": False, "error": f"Failed to delete wiki note: {e}"}
else:
return {
"success": True,
"message": f"Note '{note_title}' deleted successfully",
}

View File

@@ -2,7 +2,9 @@
<tool name="create_note">
<description>Create a personal note for observations, findings, and research during the scan.</description>
<details>Use this tool for documenting discoveries, observations, methodology notes, and questions.
This is your personal notepad for recording information you want to remember or reference later.
This is your personal and shared run memory for recording information you want to remember or reference later.
Use category "wiki" for repository source maps shared across agents in the same run.
For Codewiki patterns, prefer wiki tags `wiki:overview` and `wiki:security` for stable note roles.
For tracking actionable tasks, use the todo tool instead.</details>
<parameters>
<parameter name="title" type="string" required="true">
@@ -12,7 +14,7 @@
<description>Content of the note</description>
</parameter>
<parameter name="category" type="string" required="false">
<description>Category to organize the note (default: "general", "findings", "methodology", "questions", "plan")</description>
<description>Category to organize the note (default: "general", "findings", "methodology", "questions", "plan", "wiki")</description>
</parameter>
<parameter name="tags" type="string" required="false">
<description>Tags for categorization</description>
@@ -92,7 +94,7 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
</examples>
</tool>
<tool name="list_notes">
<description>List existing notes with optional filtering and search.</description>
<description>List existing notes with optional filtering and search (metadata-first by default).</description>
<parameters>
<parameter name="category" type="string" required="false">
<description>Filter by category</description>
@@ -103,9 +105,12 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
<parameter name="search" type="string" required="false">
<description>Search query to find in note titles and content</description>
</parameter>
<parameter name="include_content" type="boolean" required="false">
<description>Include full note content in each list item (default: false)</description>
</parameter>
</parameters>
<returns type="Dict[str, Any]">
<description>Response containing: - notes: List of matching notes - total_count: Total number of notes found</description>
<description>Response containing: - notes: List of matching notes (metadata + optional content/content_preview; wiki entries include wiki_kind) - total_count: Total number of notes found</description>
</returns>
<examples>
# List all findings
@@ -122,6 +127,28 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
<function=list_notes>
<parameter=search>admin</parameter>
<parameter=category>findings</parameter>
</function>
# Load shared repository wiki notes
<function=list_notes>
<parameter=category>wiki</parameter>
</function>
</examples>
</tool>
<tool name="get_note">
<description>Get a single note by ID, including full content.</description>
<parameters>
<parameter name="note_id" type="string" required="true">
<description>ID of the note to fetch</description>
</parameter>
</parameters>
<returns type="Dict[str, Any]">
<description>Response containing: - note: Note object including content - success: Whether note lookup succeeded</description>
</returns>
<examples>
# Read a specific wiki note after listing note IDs
<function=get_note>
<parameter=note_id>abc12</parameter>
</function>
</examples>
</tool>

View File

@@ -0,0 +1,153 @@
import importlib.util
from pathlib import Path
import pytest
def _load_utils_module():
module_path = Path(__file__).resolve().parents[2] / "strix" / "interface" / "utils.py"
spec = importlib.util.spec_from_file_location("strix_interface_utils_test", module_path)
if spec is None or spec.loader is None:
raise RuntimeError("Failed to load strix.interface.utils for tests")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
utils = _load_utils_module()
def test_parse_name_status_uses_rename_destination_path() -> None:
raw = (
b"R100\x00old/path.py\x00new/path.py\x00"
b"R75\x00legacy/module.py\x00modern/module.py\x00"
b"M\x00src/app.py\x00"
b"A\x00src/new_file.py\x00"
b"D\x00src/deleted.py\x00"
)
entries = utils._parse_name_status_z(raw)
classified = utils._classify_diff_entries(entries)
assert "new/path.py" in classified["analyzable_files"]
assert "old/path.py" not in classified["analyzable_files"]
assert "modern/module.py" in classified["analyzable_files"]
assert classified["renamed_files"][0]["old_path"] == "old/path.py"
assert classified["renamed_files"][0]["new_path"] == "new/path.py"
assert "src/deleted.py" in classified["deleted_files"]
assert "src/deleted.py" not in classified["analyzable_files"]
def test_build_diff_scope_instruction_includes_added_modified_and_deleted_guidance() -> None:
scope = utils.RepoDiffScope(
source_path="/tmp/repo",
workspace_subdir="repo",
base_ref="refs/remotes/origin/main",
merge_base="abc123",
added_files=["src/added.py"],
modified_files=["src/changed.py"],
renamed_files=[{"old_path": "src/old.py", "new_path": "src/new.py", "similarity": 90}],
deleted_files=["src/deleted.py"],
analyzable_files=["src/added.py", "src/changed.py", "src/new.py"],
)
instruction = utils.build_diff_scope_instruction([scope])
assert "For Added files, review the entire file content." in instruction
assert "For Modified files, focus primarily on the changed areas." in instruction
assert "Note: These files were deleted" in instruction
assert "src/deleted.py" in instruction
assert "src/old.py -> src/new.py" in instruction
def test_resolve_base_ref_prefers_github_base_ref(monkeypatch) -> None:
calls: list[str] = []
def fake_ref_exists(_repo_path: Path, ref: str) -> bool:
calls.append(ref)
return ref == "refs/remotes/origin/release-2026"
monkeypatch.setattr(utils, "_git_ref_exists", fake_ref_exists)
monkeypatch.setattr(utils, "_extract_github_base_sha", lambda _env: None)
monkeypatch.setattr(utils, "_resolve_origin_head_ref", lambda _repo_path: None)
base_ref = utils._resolve_base_ref(
Path("/tmp/repo"),
diff_base=None,
env={"GITHUB_BASE_REF": "release-2026"},
)
assert base_ref == "refs/remotes/origin/release-2026"
assert calls[0] == "refs/remotes/origin/release-2026"
def test_resolve_base_ref_falls_back_to_remote_main(monkeypatch) -> None:
calls: list[str] = []
def fake_ref_exists(_repo_path: Path, ref: str) -> bool:
calls.append(ref)
return ref == "refs/remotes/origin/main"
monkeypatch.setattr(utils, "_git_ref_exists", fake_ref_exists)
monkeypatch.setattr(utils, "_extract_github_base_sha", lambda _env: None)
monkeypatch.setattr(utils, "_resolve_origin_head_ref", lambda _repo_path: None)
base_ref = utils._resolve_base_ref(Path("/tmp/repo"), diff_base=None, env={})
assert base_ref == "refs/remotes/origin/main"
assert "refs/remotes/origin/main" in calls
assert "origin/main" not in calls
def test_resolve_diff_scope_context_auto_degrades_when_repo_scope_resolution_fails(
monkeypatch,
) -> None:
source = {"source_path": "/tmp/repo", "workspace_subdir": "repo"}
monkeypatch.setattr(utils, "_should_activate_auto_scope", lambda *_args, **_kwargs: True)
monkeypatch.setattr(utils, "_is_git_repo", lambda _repo_path: True)
monkeypatch.setattr(
utils,
"_resolve_repo_diff_scope",
lambda *_args, **_kwargs: (_ for _ in ()).throw(ValueError("shallow history")),
)
result = utils.resolve_diff_scope_context(
local_sources=[source],
scope_mode="auto",
diff_base=None,
non_interactive=True,
env={},
)
assert result.active is False
assert result.mode == "auto"
assert result.metadata["active"] is False
assert result.metadata["mode"] == "auto"
assert "skipped_diff_scope_sources" in result.metadata
assert result.metadata["skipped_diff_scope_sources"] == [
"/tmp/repo (diff-scope skipped: shallow history)"
]
def test_resolve_diff_scope_context_diff_mode_still_raises_on_repo_scope_resolution_failure(
monkeypatch,
) -> None:
source = {"source_path": "/tmp/repo", "workspace_subdir": "repo"}
monkeypatch.setattr(utils, "_is_git_repo", lambda _repo_path: True)
monkeypatch.setattr(
utils,
"_resolve_repo_diff_scope",
lambda *_args, **_kwargs: (_ for _ in ()).throw(ValueError("shallow history")),
)
with pytest.raises(ValueError, match="shallow history"):
utils.resolve_diff_scope_context(
local_sources=[source],
scope_mode="diff",
diff_base=None,
non_interactive=True,
env={},
)

View File

@@ -0,0 +1,30 @@
from strix.llm.config import LLMConfig
from strix.llm.llm import LLM
def test_llm_config_whitebox_defaults_to_false(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
config = LLMConfig()
assert config.is_whitebox is False
def test_llm_config_whitebox_can_be_enabled(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
config = LLMConfig(is_whitebox=True)
assert config.is_whitebox is True
def test_whitebox_prompt_loads_source_aware_coordination_skill(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
whitebox_llm = LLM(LLMConfig(scan_mode="quick", is_whitebox=True), agent_name="StrixAgent")
assert "<source_aware_whitebox>" in whitebox_llm.system_prompt
assert "<source_aware_sast>" in whitebox_llm.system_prompt
assert "Begin with fast source triage" in whitebox_llm.system_prompt
assert "You MUST begin at the very first step by running the code and testing live." not in (
whitebox_llm.system_prompt
)
non_whitebox_llm = LLM(LLMConfig(scan_mode="quick", is_whitebox=False), agent_name="StrixAgent")
assert "<source_aware_whitebox>" not in non_whitebox_llm.system_prompt
assert "<source_aware_sast>" not in non_whitebox_llm.system_prompt

View File

@@ -0,0 +1,415 @@
from types import SimpleNamespace
import strix.agents as agents_module
from strix.llm.config import LLMConfig
from strix.tools.agents_graph import agents_graph_actions
def test_create_agent_inherits_parent_whitebox_flag(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
agents_graph_actions._agent_graph["nodes"].clear()
agents_graph_actions._agent_graph["edges"].clear()
agents_graph_actions._agent_messages.clear()
agents_graph_actions._running_agents.clear()
agents_graph_actions._agent_instances.clear()
agents_graph_actions._agent_states.clear()
parent_id = "parent-agent"
parent_llm = LLMConfig(timeout=123, scan_mode="standard", is_whitebox=True)
agents_graph_actions._agent_instances[parent_id] = SimpleNamespace(
llm_config=parent_llm,
non_interactive=True,
)
captured_config: dict[str, object] = {}
class FakeStrixAgent:
def __init__(self, config: dict[str, object]):
captured_config["agent_config"] = config
class FakeThread:
def __init__(self, target, args, daemon, name):
self.target = target
self.args = args
self.daemon = daemon
self.name = name
def start(self) -> None:
return None
monkeypatch.setattr(agents_module, "StrixAgent", FakeStrixAgent)
monkeypatch.setattr(agents_graph_actions.threading, "Thread", FakeThread)
agent_state = SimpleNamespace(
agent_id=parent_id,
get_conversation_history=list,
)
result = agents_graph_actions.create_agent(
agent_state=agent_state,
task="source-aware child task",
name="SourceAwareChild",
inherit_context=False,
)
assert result["success"] is True
llm_config = captured_config["agent_config"]["llm_config"]
assert isinstance(llm_config, LLMConfig)
assert llm_config.timeout == 123
assert llm_config.scan_mode == "standard"
assert llm_config.is_whitebox is True
child_task = captured_config["agent_config"]["state"].task
assert "White-box execution guidance (recommended when source is available):" in child_task
assert "mandatory" not in child_task.lower()
def test_delegation_prompt_includes_wiki_memory_instruction_in_whitebox(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
agents_graph_actions._agent_graph["nodes"].clear()
agents_graph_actions._agent_graph["edges"].clear()
agents_graph_actions._agent_messages.clear()
agents_graph_actions._running_agents.clear()
agents_graph_actions._agent_instances.clear()
agents_graph_actions._agent_states.clear()
parent_id = "parent-1"
child_id = "child-1"
agents_graph_actions._agent_graph["nodes"][parent_id] = {"name": "Parent", "status": "running"}
agents_graph_actions._agent_graph["nodes"][child_id] = {"name": "Child", "status": "running"}
class FakeState:
def __init__(self) -> None:
self.agent_id = child_id
self.agent_name = "Child"
self.parent_id = parent_id
self.task = "analyze source risks"
self.stop_requested = False
self.messages: list[tuple[str, str]] = []
def add_message(self, role: str, content: str) -> None:
self.messages.append((role, content))
def model_dump(self) -> dict[str, str]:
return {"agent_id": self.agent_id}
class FakeAgent:
def __init__(self) -> None:
self.llm_config = LLMConfig(is_whitebox=True)
async def agent_loop(self, _task: str) -> dict[str, bool]:
return {"ok": True}
state = FakeState()
agent = FakeAgent()
agents_graph_actions._agent_instances[child_id] = agent
result = agents_graph_actions._run_agent_in_thread(agent, state, inherited_messages=[])
assert result["result"] == {"ok": True}
task_messages = [msg for role, msg in state.messages if role == "user"]
assert task_messages
assert 'list_notes(category="wiki")' in task_messages[-1]
assert "get_note(note_id=...)" in task_messages[-1]
assert "Before agent_finish" in task_messages[-1]
def test_agent_finish_appends_wiki_update_for_whitebox(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
agents_graph_actions._agent_graph["nodes"].clear()
agents_graph_actions._agent_graph["edges"].clear()
agents_graph_actions._agent_messages.clear()
agents_graph_actions._running_agents.clear()
agents_graph_actions._agent_instances.clear()
agents_graph_actions._agent_states.clear()
parent_id = "parent-2"
child_id = "child-2"
agents_graph_actions._agent_graph["nodes"][parent_id] = {
"name": "Parent",
"task": "parent task",
"status": "running",
"parent_id": None,
}
agents_graph_actions._agent_graph["nodes"][child_id] = {
"name": "Child",
"task": "child task",
"status": "running",
"parent_id": parent_id,
}
agents_graph_actions._agent_instances[child_id] = SimpleNamespace(
llm_config=LLMConfig(is_whitebox=True)
)
captured: dict[str, str] = {}
def fake_list_notes(category=None):
assert category == "wiki"
return {
"success": True,
"notes": [{"note_id": "wiki-note-1", "content": "Existing wiki content"}],
"total_count": 1,
}
captured_get: dict[str, str] = {}
def fake_get_note(note_id: str):
captured_get["note_id"] = note_id
return {
"success": True,
"note": {
"note_id": note_id,
"title": "Repo Wiki",
"content": "Existing wiki content",
},
}
def fake_append_note_content(note_id: str, delta: str):
captured["note_id"] = note_id
captured["delta"] = delta
return {"success": True, "note_id": note_id}
monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
monkeypatch.setattr("strix.tools.notes.notes_actions.append_note_content", fake_append_note_content)
state = SimpleNamespace(agent_id=child_id, parent_id=parent_id)
result = agents_graph_actions.agent_finish(
agent_state=state,
result_summary="AST pass completed",
findings=["Found route sink candidate"],
success=True,
final_recommendations=["Validate sink with dynamic PoC"],
)
assert result["agent_completed"] is True
assert captured_get["note_id"] == "wiki-note-1"
assert captured["note_id"] == "wiki-note-1"
assert "Agent Update: Child" in captured["delta"]
assert "AST pass completed" in captured["delta"]
def test_run_agent_in_thread_injects_shared_wiki_context_in_whitebox(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
agents_graph_actions._agent_graph["nodes"].clear()
agents_graph_actions._agent_graph["edges"].clear()
agents_graph_actions._agent_messages.clear()
agents_graph_actions._running_agents.clear()
agents_graph_actions._agent_instances.clear()
agents_graph_actions._agent_states.clear()
parent_id = "parent-3"
child_id = "child-3"
agents_graph_actions._agent_graph["nodes"][parent_id] = {"name": "Parent", "status": "running"}
agents_graph_actions._agent_graph["nodes"][child_id] = {"name": "Child", "status": "running"}
class FakeState:
def __init__(self) -> None:
self.agent_id = child_id
self.agent_name = "Child"
self.parent_id = parent_id
self.task = "map source"
self.stop_requested = False
self.messages: list[tuple[str, str]] = []
def add_message(self, role: str, content: str) -> None:
self.messages.append((role, content))
def model_dump(self) -> dict[str, str]:
return {"agent_id": self.agent_id}
class FakeAgent:
def __init__(self) -> None:
self.llm_config = LLMConfig(is_whitebox=True)
async def agent_loop(self, _task: str) -> dict[str, bool]:
return {"ok": True}
captured_get: dict[str, str] = {}
def fake_list_notes(category=None):
assert category == "wiki"
return {
"success": True,
"notes": [{"note_id": "wiki-ctx-1"}],
"total_count": 1,
}
def fake_get_note(note_id: str):
captured_get["note_id"] = note_id
return {
"success": True,
"note": {
"note_id": note_id,
"title": "Shared Repo Wiki",
"content": "Architecture: server/client split",
},
}
monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
state = FakeState()
agent = FakeAgent()
agents_graph_actions._agent_instances[child_id] = agent
result = agents_graph_actions._run_agent_in_thread(agent, state, inherited_messages=[])
assert result["result"] == {"ok": True}
assert captured_get["note_id"] == "wiki-ctx-1"
user_messages = [content for role, content in state.messages if role == "user"]
assert user_messages
assert "<shared_repo_wiki" in user_messages[0]
assert "Architecture: server/client split" in user_messages[0]
def test_load_primary_wiki_note_prefers_repo_tag_match(monkeypatch) -> None:
selected_note_ids: list[str] = []
def fake_list_notes(category=None):
assert category == "wiki"
return {
"success": True,
"notes": [
{"note_id": "wiki-other", "tags": ["repo:other"]},
{"note_id": "wiki-target", "tags": ["repo:appsmith"]},
],
"total_count": 2,
}
def fake_get_note(note_id: str):
selected_note_ids.append(note_id)
return {
"success": True,
"note": {"note_id": note_id, "title": "Repo Wiki", "content": "content"},
}
monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
agent_state = SimpleNamespace(
task="analyze /workspace/appsmith",
context={"whitebox_repo_tags": ["repo:appsmith"]},
)
note = agents_graph_actions._load_primary_wiki_note(agent_state)
assert note is not None
assert note["note_id"] == "wiki-target"
assert selected_note_ids == ["wiki-target"]
def test_load_primary_wiki_note_prefers_requested_wiki_kind(monkeypatch) -> None:
selected_note_ids: list[str] = []
def fake_list_notes(category=None):
assert category == "wiki"
return {
"success": True,
"notes": [
{"note_id": "wiki-security", "tags": ["repo:appsmith", "wiki:security"]},
{"note_id": "wiki-overview", "tags": ["repo:appsmith", "wiki:overview"]},
],
"total_count": 2,
}
def fake_get_note(note_id: str):
selected_note_ids.append(note_id)
return {
"success": True,
"note": {
"note_id": note_id,
"title": "Repo Wiki",
"content": "content",
},
}
monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
agent_state = SimpleNamespace(task="analyze /workspace/appsmith")
overview_note = agents_graph_actions._load_primary_wiki_note(
agent_state,
preferred_kind="overview",
allow_kind_fallback=False,
)
security_note = agents_graph_actions._load_primary_wiki_note(
agent_state,
preferred_kind="security",
allow_kind_fallback=True,
)
assert overview_note is not None
assert security_note is not None
assert overview_note["note_id"] == "wiki-overview"
assert security_note["note_id"] == "wiki-security"
assert selected_note_ids == ["wiki-overview", "wiki-security"]
def test_agent_finish_prefers_security_wiki_for_append(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
agents_graph_actions._agent_graph["nodes"].clear()
agents_graph_actions._agent_graph["edges"].clear()
agents_graph_actions._agent_messages.clear()
agents_graph_actions._running_agents.clear()
agents_graph_actions._agent_instances.clear()
agents_graph_actions._agent_states.clear()
parent_id = "parent-sec"
child_id = "child-sec"
agents_graph_actions._agent_graph["nodes"][parent_id] = {
"name": "Parent",
"task": "parent task",
"status": "running",
"parent_id": None,
}
agents_graph_actions._agent_graph["nodes"][child_id] = {
"name": "Child",
"task": "child task",
"status": "running",
"parent_id": parent_id,
}
agents_graph_actions._agent_instances[child_id] = SimpleNamespace(
llm_config=LLMConfig(is_whitebox=True)
)
captured: dict[str, str] = {}
def fake_list_notes(category=None):
assert category == "wiki"
return {
"success": True,
"notes": [
{"note_id": "wiki-overview", "tags": ["repo:appsmith", "wiki:overview"]},
{"note_id": "wiki-security", "tags": ["repo:appsmith", "wiki:security"]},
],
"total_count": 2,
}
def fake_get_note(note_id: str):
return {
"success": True,
"note": {"note_id": note_id, "title": "Repo Wiki", "content": "Existing wiki content"},
}
def fake_append_note_content(note_id: str, delta: str):
captured["note_id"] = note_id
captured["delta"] = delta
return {"success": True, "note_id": note_id}
monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
monkeypatch.setattr("strix.tools.notes.notes_actions.append_note_content", fake_append_note_content)
state = SimpleNamespace(agent_id=child_id, parent_id=parent_id, task="analyze /workspace/appsmith")
result = agents_graph_actions.agent_finish(
agent_state=state,
result_summary="Static triage completed",
findings=["Found candidate sink"],
success=True,
final_recommendations=["Validate with dynamic PoC"],
)
assert result["agent_completed"] is True
assert captured["note_id"] == "wiki-security"
assert "Static triage completed" in captured["delta"]

View File

@@ -0,0 +1,267 @@
from pathlib import Path
from strix.telemetry.tracer import Tracer, get_global_tracer, set_global_tracer
from strix.tools.notes import notes_actions
def _reset_notes_state() -> None:
notes_actions._notes_storage.clear()
notes_actions._loaded_notes_run_dir = None
def test_wiki_notes_are_persisted_and_removed(tmp_path: Path, monkeypatch) -> None:
monkeypatch.chdir(tmp_path)
_reset_notes_state()
previous_tracer = get_global_tracer()
tracer = Tracer("wiki-test-run")
set_global_tracer(tracer)
try:
created = notes_actions.create_note(
title="Repo Map",
content="## Architecture\n- monolith",
category="wiki",
tags=["source-map"],
)
assert created["success"] is True
note_id = created["note_id"]
assert isinstance(note_id, str)
note = notes_actions._notes_storage[note_id]
wiki_filename = note.get("wiki_filename")
assert isinstance(wiki_filename, str)
wiki_path = tmp_path / "strix_runs" / "wiki-test-run" / "wiki" / wiki_filename
assert wiki_path.exists()
assert "## Architecture" in wiki_path.read_text(encoding="utf-8")
updated = notes_actions.update_note(
note_id=note_id,
content="## Architecture\n- service-oriented",
)
assert updated["success"] is True
assert "service-oriented" in wiki_path.read_text(encoding="utf-8")
deleted = notes_actions.delete_note(note_id=note_id)
assert deleted["success"] is True
assert wiki_path.exists() is False
finally:
_reset_notes_state()
set_global_tracer(previous_tracer) # type: ignore[arg-type]
def test_notes_jsonl_replay_survives_memory_reset(tmp_path: Path, monkeypatch) -> None:
monkeypatch.chdir(tmp_path)
_reset_notes_state()
previous_tracer = get_global_tracer()
tracer = Tracer("notes-replay-run")
set_global_tracer(tracer)
try:
created = notes_actions.create_note(
title="Auth findings",
content="initial finding",
category="findings",
tags=["auth"],
)
assert created["success"] is True
note_id = created["note_id"]
assert isinstance(note_id, str)
notes_path = tmp_path / "strix_runs" / "notes-replay-run" / "notes" / "notes.jsonl"
assert notes_path.exists() is True
_reset_notes_state()
listed = notes_actions.list_notes(category="findings")
assert listed["success"] is True
assert listed["total_count"] == 1
assert listed["notes"][0]["note_id"] == note_id
assert "content" not in listed["notes"][0]
assert "content_preview" in listed["notes"][0]
updated = notes_actions.update_note(note_id=note_id, content="updated finding")
assert updated["success"] is True
_reset_notes_state()
listed_after_update = notes_actions.list_notes(search="updated finding")
assert listed_after_update["success"] is True
assert listed_after_update["total_count"] == 1
assert listed_after_update["notes"][0]["note_id"] == note_id
assert listed_after_update["notes"][0]["content_preview"] == "updated finding"
listed_with_content = notes_actions.list_notes(
category="findings",
include_content=True,
)
assert listed_with_content["success"] is True
assert listed_with_content["total_count"] == 1
assert listed_with_content["notes"][0]["content"] == "updated finding"
deleted = notes_actions.delete_note(note_id=note_id)
assert deleted["success"] is True
_reset_notes_state()
listed_after_delete = notes_actions.list_notes(category="findings")
assert listed_after_delete["success"] is True
assert listed_after_delete["total_count"] == 0
finally:
_reset_notes_state()
set_global_tracer(previous_tracer) # type: ignore[arg-type]
def test_get_note_returns_full_note(tmp_path: Path, monkeypatch) -> None:
monkeypatch.chdir(tmp_path)
_reset_notes_state()
previous_tracer = get_global_tracer()
tracer = Tracer("get-note-run")
set_global_tracer(tracer)
try:
created = notes_actions.create_note(
title="Repo wiki",
content="entrypoints and sinks",
category="wiki",
tags=["repo:appsmith", "wiki:security"],
)
assert created["success"] is True
note_id = created["note_id"]
assert isinstance(note_id, str)
result = notes_actions.get_note(note_id=note_id)
assert result["success"] is True
assert result["note"]["note_id"] == note_id
assert result["note"]["content"] == "entrypoints and sinks"
assert result["note"]["wiki_kind"] == "security"
finally:
_reset_notes_state()
set_global_tracer(previous_tracer) # type: ignore[arg-type]
def test_append_note_content_appends_delta(tmp_path: Path, monkeypatch) -> None:
monkeypatch.chdir(tmp_path)
_reset_notes_state()
previous_tracer = get_global_tracer()
tracer = Tracer("append-note-run")
set_global_tracer(tracer)
try:
created = notes_actions.create_note(
title="Repo wiki",
content="base",
category="wiki",
tags=["repo:demo"],
)
assert created["success"] is True
note_id = created["note_id"]
assert isinstance(note_id, str)
appended = notes_actions.append_note_content(
note_id=note_id,
delta="\n\n## Agent Update: worker\nSummary: done",
)
assert appended["success"] is True
loaded = notes_actions.get_note(note_id=note_id)
assert loaded["success"] is True
assert loaded["note"]["content"] == "base\n\n## Agent Update: worker\nSummary: done"
finally:
_reset_notes_state()
set_global_tracer(previous_tracer) # type: ignore[arg-type]
def test_list_and_get_note_handle_wiki_repersist_oserror_gracefully(
tmp_path: Path, monkeypatch
) -> None:
monkeypatch.chdir(tmp_path)
_reset_notes_state()
previous_tracer = get_global_tracer()
tracer = Tracer("wiki-repersist-oserror-run")
set_global_tracer(tracer)
try:
created = notes_actions.create_note(
title="Repo wiki",
content="initial wiki content",
category="wiki",
tags=["repo:demo"],
)
assert created["success"] is True
note_id = created["note_id"]
assert isinstance(note_id, str)
_reset_notes_state()
def _raise_oserror(*_args, **_kwargs) -> None:
raise OSError("disk full")
monkeypatch.setattr(notes_actions, "_persist_wiki_note", _raise_oserror)
listed = notes_actions.list_notes(category="wiki")
assert listed["success"] is True
assert listed["total_count"] == 1
assert listed["notes"][0]["note_id"] == note_id
fetched = notes_actions.get_note(note_id=note_id)
assert fetched["success"] is True
assert fetched["note"]["note_id"] == note_id
assert fetched["note"]["content"] == "initial wiki content"
finally:
_reset_notes_state()
set_global_tracer(previous_tracer) # type: ignore[arg-type]
def test_wiki_index_tracks_overview_and_security_notes(tmp_path: Path, monkeypatch) -> None:
monkeypatch.chdir(tmp_path)
_reset_notes_state()
previous_tracer = get_global_tracer()
tracer = Tracer("wiki-index-run")
set_global_tracer(tracer)
try:
overview = notes_actions.create_note(
title="Repo overview wiki",
content="architecture and entrypoints",
category="wiki",
tags=["repo:demo", "wiki:overview"],
)
assert overview["success"] is True
overview_id = overview["note_id"]
assert isinstance(overview_id, str)
security = notes_actions.create_note(
title="Repo security wiki",
content="scanner summary and follow-ups",
category="wiki",
tags=["repo:demo", "wiki:security"],
)
assert security["success"] is True
security_id = security["note_id"]
assert isinstance(security_id, str)
wiki_index = tmp_path / "strix_runs" / "wiki-index-run" / "wiki" / "index.json"
assert wiki_index.exists() is True
index_data = wiki_index.read_text(encoding="utf-8")
assert '"wiki_kind": "overview"' in index_data
assert '"wiki_kind": "security"' in index_data
listed = notes_actions.list_notes(category="wiki")
assert listed["success"] is True
note_kinds = {note["note_id"]: note.get("wiki_kind") for note in listed["notes"]}
assert note_kinds[overview_id] == "overview"
assert note_kinds[security_id] == "security"
deleted = notes_actions.delete_note(note_id=overview_id)
assert deleted["success"] is True
index_after_delete = wiki_index.read_text(encoding="utf-8")
assert overview_id not in index_after_delete
assert security_id in index_after_delete
finally:
_reset_notes_state()
set_global_tracer(previous_tracer) # type: ignore[arg-type]