Feat: expanded source aware testing

This commit is contained in:
bearsyankees
2026-03-23 16:43:58 -04:00
parent b67712beec
commit 69a59890ff
17 changed files with 640 additions and 23 deletions

View File

@@ -101,6 +101,33 @@ RUN npm install -g retire@latest && \
npm install -g @ast-grep/cli@latest && \
npm install -g tree-sitter-cli@latest
RUN set -eux; \
TS_PARSER_DIR="/home/pentester/.tree-sitter/parsers"; \
mkdir -p "${TS_PARSER_DIR}"; \
for repo in tree-sitter-java tree-sitter-javascript tree-sitter-python tree-sitter-go tree-sitter-bash tree-sitter-json tree-sitter-yaml tree-sitter-typescript; do \
if [ "$repo" = "tree-sitter-yaml" ]; then \
repo_url="https://github.com/tree-sitter-grammars/${repo}.git"; \
else \
repo_url="https://github.com/tree-sitter/${repo}.git"; \
fi; \
if [ ! -d "${TS_PARSER_DIR}/${repo}" ]; then \
git clone --depth 1 "${repo_url}" "${TS_PARSER_DIR}/${repo}"; \
fi; \
done; \
if [ -d "${TS_PARSER_DIR}/tree-sitter-typescript/typescript" ]; then \
ln -sfn "${TS_PARSER_DIR}/tree-sitter-typescript/typescript" "${TS_PARSER_DIR}/tree-sitter-typescript-typescript"; \
fi; \
if [ -d "${TS_PARSER_DIR}/tree-sitter-typescript/tsx" ]; then \
ln -sfn "${TS_PARSER_DIR}/tree-sitter-typescript/tsx" "${TS_PARSER_DIR}/tree-sitter-typescript-tsx"; \
fi; \
tree-sitter init-config >/dev/null 2>&1 || true; \
TS_CONFIG="/home/pentester/.config/tree-sitter/config.json"; \
mkdir -p "$(dirname "${TS_CONFIG}")"; \
[ -f "${TS_CONFIG}" ] || printf '{}\n' > "${TS_CONFIG}"; \
TMP_CFG="$(mktemp)"; \
jq --arg p "${TS_PARSER_DIR}" '.["parser-directories"] = ((.["parser-directories"] // []) + [$p] | unique)' "${TS_CONFIG}" > "${TMP_CFG}"; \
mv "${TMP_CFG}" "${TS_CONFIG}"
WORKDIR /home/pentester/tools
RUN git clone https://github.com/aravind0x7/JS-Snooper.git && \
chmod +x JS-Snooper/js_snooper.sh && \
@@ -112,7 +139,18 @@ RUN git clone https://github.com/aravind0x7/JS-Snooper.git && \
USER root
RUN curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin
RUN curl -sSfL https://raw.githubusercontent.com/gitleaks/gitleaks/master/install.sh | sh -s -- -b /usr/local/bin
RUN set -eux; \
ARCH="$(uname -m)"; \
case "$ARCH" in \
x86_64) GITLEAKS_ARCH="x64" ;; \
aarch64|arm64) GITLEAKS_ARCH="arm64" ;; \
*) echo "Unsupported architecture: $ARCH" >&2; exit 1 ;; \
esac; \
TAG="$(curl -fsSL https://api.github.com/repos/gitleaks/gitleaks/releases/latest | jq -r .tag_name)"; \
curl -fsSL "https://github.com/gitleaks/gitleaks/releases/download/${TAG}/gitleaks_${TAG#v}_linux_${GITLEAKS_ARCH}.tar.gz" -o /tmp/gitleaks.tgz; \
tar -xzf /tmp/gitleaks.tgz -C /tmp; \
install -m 0755 /tmp/gitleaks /usr/local/bin/gitleaks; \
rm -f /tmp/gitleaks /tmp/gitleaks.tgz
RUN apt-get update && apt-get install -y zaproxy

View File

@@ -51,7 +51,7 @@ Strix runs inside a Kali Linux-based Docker container with a comprehensive set o
| ------------------------------------------------------- | --------------------------------------------- |
| [Semgrep](https://github.com/semgrep/semgrep) | Fast SAST and custom rule matching |
| [ast-grep](https://ast-grep.github.io) | Structural AST/CST-aware code search (`sg`) |
| [Tree-sitter](https://tree-sitter.github.io/tree-sitter/) | Syntax tree parsing and symbol extraction |
| [Tree-sitter](https://tree-sitter.github.io/tree-sitter/) | Syntax tree parsing and symbol extraction (Java/JS/TS/Python/Go/Bash/JSON/YAML grammars pre-configured) |
| [Bandit](https://bandit.readthedocs.io) | Python security linter |
## Secret Detection

View File

@@ -91,7 +91,11 @@ BLACK-BOX TESTING (domain/subdomain only):
WHITE-BOX TESTING (code provided):
- MUST perform BOTH static AND dynamic analysis
- Static: Use source-aware triage first to map risk quickly (`semgrep`, `ast-grep`, Tree-sitter tooling, `gitleaks`, `trufflehog`, `trivy fs`). Then review code for vulnerabilities
- Shared memory: Use notes as shared working memory; check existing `wiki` notes first (`list_notes`), then update one repo wiki note instead of creating duplicates
- Static coverage floor: execute at least one structural AST mapping pass (`sg` and/or Tree-sitter) per repository and keep artifact output
- Static coverage target per repository: run one `semgrep` pass, one secrets pass (`gitleaks` and/or `trufflehog`), one `trivy fs` pass, and one AST-structural pass (`sg` and/or Tree-sitter); if any are skipped, record why in the shared wiki
- Keep AST artifacts bounded and high-signal: scope to relevant paths/hypotheses, avoid whole-repo generic function dumps
- Shared memory: Use notes as shared working memory; discover wiki notes with `list_notes`, then read the selected one via `get_note(note_id=...)` before analysis
- Before `agent_finish`/`finish_scan`, update the shared repo wiki with scanner summaries, key routes/sinks, and dynamic follow-up plan
- Dynamic: Run the application and test live to validate exploitability
- NEVER rely solely on static code analysis when dynamic validation is possible
- Begin with fast source triage and dynamic run preparation in parallel; use static findings to prioritize live testing.

View File

@@ -117,6 +117,8 @@ class ListNotesRenderer(BaseToolRenderer):
title = note.get("title", "").strip() or "(untitled)"
category = note.get("category", "general")
note_content = note.get("content", "").strip()
if not note_content:
note_content = note.get("content_preview", "").strip()
text.append("\n - ")
text.append(title)
@@ -131,3 +133,35 @@ class ListNotesRenderer(BaseToolRenderer):
css_classes = cls.get_css_classes("completed")
return Static(text, classes=css_classes)
@register_tool_renderer
class GetNoteRenderer(BaseToolRenderer):
tool_name: ClassVar[str] = "get_note"
css_classes: ClassVar[list[str]] = ["tool-call", "notes-tool"]
@classmethod
def render(cls, tool_data: dict[str, Any]) -> Static:
result = tool_data.get("result")
text = Text()
text.append("", style="#fbbf24")
text.append("note read", style="dim")
if result and isinstance(result, dict) and result.get("success"):
note = result.get("note", {}) or {}
title = str(note.get("title", "")).strip() or "(untitled)"
category = note.get("category", "general")
content = str(note.get("content", "")).strip()
text.append("\n ")
text.append(title)
text.append(f" ({category})", style="dim")
if content:
text.append("\n ")
text.append(content, style="dim")
else:
text.append("\n ")
text.append("Loading...", style="dim")
css_classes = cls.get_css_classes("completed")
return Static(text, classes=css_classes)

View File

@@ -107,6 +107,7 @@ class LLM:
ordered_skills.append(f"scan_modes/{self.config.scan_mode}")
if self.config.is_whitebox:
ordered_skills.append("coordination/source_aware_whitebox")
ordered_skills.append("source_aware_sast")
deduped: list[str] = []
seen: set[str] = set()

View File

@@ -9,11 +9,11 @@ Use this coordination playbook when repository source code is available.
## Objective
Increase white-box coverage by combining source-aware triage with dynamic validation. Source-aware tooling is recommended by default, but not mandatory when context suggests a better path.
Increase white-box coverage by combining source-aware triage with dynamic validation. Source-aware tooling is expected by default when source is available.
## Recommended Workflow
1. Build a quick source map before deep exploitation.
1. Build a quick source map before deep exploitation, including at least one AST-structural pass (`sg` or `tree-sitter`) scoped to relevant paths.
2. Run first-pass static triage to rank high-risk paths.
3. Use triage outputs to prioritize dynamic PoC validation.
4. Keep findings evidence-driven: no report without validation.
@@ -27,6 +27,13 @@ Increase white-box coverage by combining source-aware triage with dynamic valida
- `gitleaks` + `trufflehog`: complementary secret detection (working tree and history coverage)
- `trivy fs`: dependency, misconfiguration, license, and secret checks
Coverage target per repository:
- one `semgrep` pass
- one AST structural pass (`sg` and/or `tree-sitter`)
- one secrets pass (`gitleaks` and/or `trufflehog`)
- one `trivy fs` pass
- if any part is skipped, log the reason in the shared wiki note
## Agent Delegation Guidance
- Keep child agents specialized by vulnerability/component as usual.
@@ -38,10 +45,11 @@ Increase white-box coverage by combining source-aware triage with dynamic valida
When source is present, maintain one wiki note per repository and keep it current.
Operational rules:
- At task start, call `list_notes` with `category=wiki` and reuse existing repo wiki if present.
- At task start, call `list_notes` with `category=wiki`, then read the selected wiki with `get_note(note_id=...)`.
- If no repo wiki exists, create one with `create_note` and `category=wiki`.
- Update the same wiki via `update_note`; avoid creating duplicate wiki notes for the same repo.
- Child agents should read wiki notes first, then extend with new evidence from their scope.
- Child agents should read wiki notes first via `get_note`, then extend with new evidence from their scope.
- Before calling `agent_finish`, each source-focused child agent should append a short delta update to the shared repo wiki (scanner outputs, route/sink map deltas, dynamic follow-ups).
Recommended sections:
- Architecture overview

View File

@@ -19,16 +19,42 @@ Before scanning, check shared wiki memory:
```text
1) list_notes(category="wiki")
2) Reuse matching repo wiki note if present
3) create_note(category="wiki") only if missing
2) get_note(note_id=...) for the selected repo wiki before analysis
3) Reuse matching repo wiki note if present
4) create_note(category="wiki") only if missing
```
After every major source-analysis batch, update the same repo wiki note with `update_note` so other agents can reuse your latest map.
## Baseline Coverage Bundle (Recommended)
Run this baseline once per repository before deep narrowing:
```bash
ART=/workspace/.strix-source-aware
mkdir -p "$ART"
semgrep scan --config p/default --config p/golang --config p/secrets \
--metrics=off --json --output "$ART/semgrep.json" .
sg scan --json . > "$ART/ast-grep.json"
gitleaks detect --source . --report-format json --report-path "$ART/gitleaks.json" || true
trufflehog filesystem --no-update --json --no-verification . > "$ART/trufflehog.json" || true
trivy fs --format json --output "$ART/trivy-fs.json" .
```
If one tool is skipped or fails, record that in the shared wiki note along with the reason.
## Semgrep First Pass
Use Semgrep as the default static triage pass:
```bash
semgrep --config auto --json --output /workspace/.strix-source-aware/semgrep.json .
# Preferred deterministic profile set (works with --metrics=off)
semgrep scan --config p/default --config p/golang --config p/secrets \
--metrics=off --json --output /workspace/.strix-source-aware/semgrep.json .
# If you choose auto config, do not combine it with --metrics=off
semgrep scan --config auto --json --output /workspace/.strix-source-aware/semgrep-auto.json .
```
If diff scope is active, restrict to changed files first, then expand only when needed.
@@ -92,6 +118,11 @@ Keep one wiki note per repository and update these sections:
## Dynamic Validation Follow-Ups
```
Before `agent_finish`, make one final `update_note` call to capture:
- scanner artifacts and paths
- top validated/invalidated hypotheses
- concrete dynamic follow-up tasks
## Anti-Patterns
- Do not treat scanner output as final truth.

View File

@@ -15,8 +15,10 @@ Thorough understanding before exploitation. Test every parameter, every endpoint
**Whitebox (source available)**
- Map every file, module, and code path in the repository
- Load and maintain shared `wiki` notes from the start (`list_notes(category="wiki")`), then continuously update one repo note
- Load and maintain shared `wiki` notes from the start (`list_notes(category="wiki")` then `get_note(note_id=...)`), then continuously update one repo note
- Start with broad source-aware triage (`semgrep`, `ast-grep`, `gitleaks`, `trufflehog`, `trivy fs`) and use outputs to drive deep review
- Execute at least one structural AST pass (`sg` and/or Tree-sitter) per repository and store artifacts for reuse
- Keep AST artifacts bounded and query-driven (target relevant paths/sinks first; avoid whole-repo generic function dumps)
- Use syntax-aware parsing (Tree-sitter tooling) to improve symbol, route, and sink extraction quality
- Trace all entry points from HTTP handlers to database queries
- Document all authentication mechanisms and implementations
@@ -29,6 +31,7 @@ Thorough understanding before exploitation. Test every parameter, every endpoint
- Review file handling: upload, download, processing
- Understand the deployment model and infrastructure assumptions
- Check all dependency versions and repository risks against CVE/misconfiguration data
- Before final completion, update the shared repo wiki with scanner summary + dynamic follow-ups
**Blackbox (no source)**
- Exhaustive subdomain enumeration with multiple sources and tools

View File

@@ -15,12 +15,15 @@ Optimize for fast feedback on critical security issues. Skip exhaustive enumerat
**Whitebox (source available)**
- Focus on recent changes: git diffs, new commits, modified files—these are most likely to contain fresh bugs
- Read existing `wiki` notes first (`list_notes(category="wiki")`) to avoid remapping from scratch
- Read existing `wiki` notes first (`list_notes(category="wiki")` then `get_note(note_id=...)`) to avoid remapping from scratch
- Run a fast static triage on changed files first (`semgrep`, then targeted `sg` queries)
- Run at least one lightweight AST pass (`sg` or Tree-sitter) so structural mapping is not skipped
- Keep AST commands tightly scoped to changed or high-risk paths; avoid broad repository-wide pattern dumps
- Run quick secret and dependency checks (`gitleaks`, `trufflehog`, `trivy fs`) scoped to changed areas when possible
- Identify security-sensitive patterns in changed code: auth checks, input handling, database queries, file operations
- Trace user input through modified code paths
- Check if security controls were modified or bypassed
- Before completion, update the shared repo wiki with what changed and what needs dynamic follow-up
**Blackbox (no source)**
- Map authentication and critical user flows

View File

@@ -15,15 +15,17 @@ Systematic testing across the full attack surface. Understand the application be
**Whitebox (source available)**
- Map codebase structure: modules, entry points, routing
- Start by loading existing `wiki` notes (`list_notes(category="wiki")`) and update one shared repo note as mapping evolves
- Start by loading existing `wiki` notes (`list_notes(category="wiki")` then `get_note(note_id=...)`) and update one shared repo note as mapping evolves
- Run `semgrep` first-pass triage to prioritize risky flows before deep manual review
- Use `ast-grep`/Tree-sitter-assisted structural search for route, sink, and trust-boundary mapping
- Run at least one AST-structural mapping pass (`sg` and/or Tree-sitter), then use outputs for route, sink, and trust-boundary mapping
- Keep AST output bounded to relevant paths and hypotheses; avoid whole-repo generic function dumps
- Identify architecture pattern (MVC, microservices, monolith)
- Trace input vectors: forms, APIs, file uploads, headers, cookies
- Review authentication and authorization flows
- Analyze database interactions and ORM usage
- Check dependencies and repo risks with `trivy fs`, `gitleaks`, and `trufflehog`
- Understand the data model and sensitive data locations
- Before completion, update the shared repo wiki with source findings summary and dynamic validation next steps
**Blackbox (no source)**
- Crawl application thoroughly, interact with every feature

View File

@@ -1,5 +1,6 @@
import threading
from datetime import UTC, datetime
import re
from typing import Any, Literal
from strix.tools.registry import register_tool
@@ -21,6 +22,142 @@ _agent_instances: dict[str, Any] = {}
_agent_states: dict[str, Any] = {}
def _is_whitebox_agent(agent_id: str) -> bool:
agent = _agent_instances.get(agent_id)
return bool(getattr(getattr(agent, "llm_config", None), "is_whitebox", False))
def _extract_repo_tags(agent_state: Any | None) -> set[str]:
repo_tags: set[str] = set()
if agent_state is None:
return repo_tags
task_text = str(getattr(agent_state, "task", "") or "")
for workspace_subdir in re.findall(r"/workspace/([A-Za-z0-9._-]+)", task_text):
repo_tags.add(f"repo:{workspace_subdir.lower()}")
for repo_name in re.findall(r"github\.com/[^/\s]+/([A-Za-z0-9._-]+)", task_text):
normalized = repo_name.removesuffix(".git").lower()
if normalized:
repo_tags.add(f"repo:{normalized}")
return repo_tags
def _load_primary_wiki_note(agent_state: Any | None = None) -> dict[str, Any] | None:
try:
from strix.tools.notes.notes_actions import get_note, list_notes
notes_result = list_notes(category="wiki")
if not notes_result.get("success"):
return None
notes = notes_result.get("notes") or []
if not notes:
return None
selected_note_id = None
repo_tags = _extract_repo_tags(agent_state)
if repo_tags:
for note in notes:
note_tags = note.get("tags") or []
if not isinstance(note_tags, list):
continue
normalized_note_tags = {str(tag).strip().lower() for tag in note_tags if str(tag).strip()}
if normalized_note_tags.intersection(repo_tags):
selected_note_id = note.get("note_id")
break
note_id = selected_note_id or notes[0].get("note_id")
if not isinstance(note_id, str) or not note_id:
return None
note_result = get_note(note_id=note_id)
if not note_result.get("success"):
return None
note = note_result.get("note")
if not isinstance(note, dict):
return None
except Exception:
return None
else:
return note
def _inject_wiki_context_for_whitebox(agent_state: Any) -> None:
if not _is_whitebox_agent(agent_state.agent_id):
return
wiki_note = _load_primary_wiki_note(agent_state)
if not wiki_note:
return
title = str(wiki_note.get("title") or "repo wiki")
content = str(wiki_note.get("content") or "").strip()
if not content:
return
max_chars = 4000
truncated_content = content[:max_chars]
suffix = "\n\n[truncated for context size]" if len(content) > max_chars else ""
agent_state.add_message(
"user",
(
f"<shared_repo_wiki title=\"{title}\">\n"
f"{truncated_content}{suffix}\n"
"</shared_repo_wiki>"
),
)
def _append_wiki_update_on_finish(
agent_state: Any,
agent_name: str,
result_summary: str,
findings: list[str] | None,
final_recommendations: list[str] | None,
) -> None:
if not _is_whitebox_agent(agent_state.agent_id):
return
try:
from strix.tools.notes.notes_actions import update_note
note = _load_primary_wiki_note(agent_state)
if not note:
return
note_id = note.get("note_id")
if not isinstance(note_id, str) or not note_id:
return
existing_content = str(note.get("content") or "")
timestamp = datetime.now(UTC).isoformat()
summary = " ".join(str(result_summary).split())
if len(summary) > 1200:
summary = f"{summary[:1197]}..."
findings_lines = "\n".join(f"- {item}" for item in (findings or [])) or "- none"
recommendation_lines = (
"\n".join(f"- {item}" for item in (final_recommendations or [])) or "- none"
)
delta = (
f"\n\n## Agent Update: {agent_name} ({timestamp})\n"
f"Summary: {summary}\n\n"
"Findings:\n"
f"{findings_lines}\n\n"
"Recommendations:\n"
f"{recommendation_lines}\n"
)
updated_content = f"{existing_content.rstrip()}{delta}"
update_note(note_id=note_id, content=updated_content)
except Exception:
# Best-effort update; never block agent completion on note persistence.
return
def _run_agent_in_thread(
agent: Any, state: Any, inherited_messages: list[dict[str, Any]]
) -> dict[str, Any]:
@@ -31,6 +168,8 @@ def _run_agent_in_thread(
state.add_message(msg["role"], msg["content"])
state.add_message("user", "</inherited_context_from_parent>")
_inject_wiki_context_for_whitebox(state)
parent_info = _agent_graph["nodes"].get(state.parent_id, {})
parent_name = parent_info.get("name", "Unknown Parent")
@@ -42,9 +181,14 @@ def _run_agent_in_thread(
wiki_memory_instruction = ""
if getattr(getattr(agent, "llm_config", None), "is_whitebox", False):
wiki_memory_instruction = (
'\n - White-box memory: call list_notes(category="wiki") early, '
"reuse existing repo wiki notes, and update the same note instead of "
"creating duplicates"
'\n - White-box memory (recommended): call list_notes(category="wiki") and then '
"get_note(note_id=...) before substantive work (including terminal scans)"
"\n - Reuse one repo wiki note where possible and avoid duplicates"
"\n - Before agent_finish, call list_notes(category=\"wiki\") + get_note(note_id=...) again, then append a short scope delta via update_note (new routes/sinks, scanner results, dynamic follow-ups)"
"\n - If terminal output contains `command not found` or shell parse errors, correct and rerun before using the result"
"\n - Use ASCII-only shell commands; if a command includes unexpected non-ASCII characters, rerun with a clean ASCII command"
"\n - Keep AST artifacts bounded: target relevant paths and avoid whole-repo generic function dumps"
"\n - Source-aware tooling is advisory: choose semgrep/AST/tree-sitter/gitleaks/trivy when relevant, do not force static steps for purely dynamic validation tasks"
)
task_xml = f"""<agent_delegation>
@@ -232,8 +376,23 @@ def create_agent(
if hasattr(parent_agent.llm_config, "is_whitebox"):
is_whitebox = parent_agent.llm_config.is_whitebox
interactive = getattr(parent_agent.llm_config, "interactive", False)
if hasattr(parent_agent.llm_config, "is_whitebox"):
is_whitebox = parent_agent.llm_config.is_whitebox
if is_whitebox:
whitebox_guidance = (
"\n\nWhite-box execution guidance (recommended when source is available):\n"
"- Use structural AST mapping (`sg` or `tree-sitter`) where it helps source analysis; "
"keep artifacts bounded and skip forced AST steps for purely dynamic validation tasks.\n"
"- Keep AST output bounded: scope to relevant paths/files, avoid whole-repo "
"generic function patterns, and cap artifact size.\n"
'- Use shared wiki memory by calling list_notes(category="wiki") then '
"get_note(note_id=...).\n"
'- Before agent_finish, call list_notes(category="wiki") + get_note(note_id=...) '
"again, reuse one repo wiki, and call update_note.\n"
"- If terminal output contains `command not found` or shell parse errors, "
"correct and rerun before using the result."
)
if "White-box execution guidance (recommended when source is available):" not in task:
task = f"{task.rstrip()}{whitebox_guidance}"
state = AgentState(
task=task,
@@ -395,6 +554,14 @@ def agent_finish(
"recommendations": final_recommendations or [],
}
_append_wiki_update_on_finish(
agent_state=agent_state,
agent_name=agent_node["name"],
result_summary=result_summary,
findings=findings,
final_recommendations=final_recommendations,
)
parent_notified = False
if report_to_parent and agent_node["parent_id"]:

View File

@@ -1,6 +1,7 @@
from .notes_actions import (
create_note,
delete_note,
get_note,
list_notes,
update_note,
)
@@ -9,6 +10,7 @@ from .notes_actions import (
__all__ = [
"create_note",
"delete_note",
"get_note",
"list_notes",
"update_note",
]

View File

@@ -12,6 +12,7 @@ _notes_storage: dict[str, dict[str, Any]] = {}
_VALID_NOTE_CATEGORIES = ["general", "findings", "methodology", "questions", "plan", "wiki"]
_notes_lock = threading.RLock()
_loaded_notes_run_dir: str | None = None
_DEFAULT_CONTENT_PREVIEW_CHARS = 280
def _get_run_dir() -> Path | None:
@@ -204,6 +205,38 @@ def _filter_notes(
return filtered_notes
def _to_note_listing_entry(
note: dict[str, Any],
*,
include_content: bool = False,
) -> dict[str, Any]:
entry = {
"note_id": note.get("note_id"),
"title": note.get("title", ""),
"category": note.get("category", "general"),
"tags": note.get("tags", []),
"created_at": note.get("created_at", ""),
"updated_at": note.get("updated_at", ""),
}
wiki_filename = note.get("wiki_filename")
if isinstance(wiki_filename, str) and wiki_filename:
entry["wiki_filename"] = wiki_filename
content = str(note.get("content", ""))
if include_content:
entry["content"] = content
elif content:
if len(content) > _DEFAULT_CONTENT_PREVIEW_CHARS:
entry["content_preview"] = (
f"{content[:_DEFAULT_CONTENT_PREVIEW_CHARS].rstrip()}..."
)
else:
entry["content_preview"] = content
return entry
@register_tool(sandbox_execution=False)
def create_note( # noqa: PLR0911
title: str,
@@ -272,15 +305,20 @@ def list_notes(
category: str | None = None,
tags: list[str] | None = None,
search: str | None = None,
include_content: bool = False,
) -> dict[str, Any]:
with _notes_lock:
try:
filtered_notes = _filter_notes(category=category, tags=tags, search_query=search)
notes = [
_to_note_listing_entry(note, include_content=include_content)
for note in filtered_notes
]
return {
"success": True,
"notes": filtered_notes,
"total_count": len(filtered_notes),
"notes": notes,
"total_count": len(notes),
}
except (ValueError, TypeError) as e:
@@ -292,6 +330,40 @@ def list_notes(
}
@register_tool(sandbox_execution=False)
def get_note(note_id: str) -> dict[str, Any]:
with _notes_lock:
try:
_ensure_notes_loaded()
if not note_id or not note_id.strip():
return {
"success": False,
"error": "Note ID cannot be empty",
"note": None,
}
note = _notes_storage.get(note_id)
if note is None:
return {
"success": False,
"error": f"Note with ID '{note_id}' not found",
"note": None,
}
note_with_id = note.copy()
note_with_id["note_id"] = note_id
except (ValueError, TypeError) as e:
return {
"success": False,
"error": f"Failed to get note: {e}",
"note": None,
}
else:
return {"success": True, "note": note_with_id}
@register_tool(sandbox_execution=False)
def update_note(
note_id: str,

View File

@@ -93,7 +93,7 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
</examples>
</tool>
<tool name="list_notes">
<description>List existing notes with optional filtering and search.</description>
<description>List existing notes with optional filtering and search (metadata-first by default).</description>
<parameters>
<parameter name="category" type="string" required="false">
<description>Filter by category</description>
@@ -104,9 +104,12 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
<parameter name="search" type="string" required="false">
<description>Search query to find in note titles and content</description>
</parameter>
<parameter name="include_content" type="boolean" required="false">
<description>Include full note content in each list item (default: false)</description>
</parameter>
</parameters>
<returns type="Dict[str, Any]">
<description>Response containing: - notes: List of matching notes - total_count: Total number of notes found</description>
<description>Response containing: - notes: List of matching notes (metadata + optional content/content_preview) - total_count: Total number of notes found</description>
</returns>
<examples>
# List all findings
@@ -131,6 +134,23 @@ The /api/internal/* endpoints are high priority as they appear to lack authentic
</function>
</examples>
</tool>
<tool name="get_note">
<description>Get a single note by ID, including full content.</description>
<parameters>
<parameter name="note_id" type="string" required="true">
<description>ID of the note to fetch</description>
</parameter>
</parameters>
<returns type="Dict[str, Any]">
<description>Response containing: - note: Note object including content - success: Whether note lookup succeeded</description>
</returns>
<examples>
# Read a specific wiki note after listing note IDs
<function=get_note>
<parameter=note_id>abc12</parameter>
</function>
</examples>
</tool>
<tool name="update_note">
<description>Update an existing note.</description>
<parameters>

View File

@@ -19,6 +19,7 @@ def test_whitebox_prompt_loads_source_aware_coordination_skill(monkeypatch) -> N
whitebox_llm = LLM(LLMConfig(scan_mode="quick", is_whitebox=True), agent_name="StrixAgent")
assert "<source_aware_whitebox>" in whitebox_llm.system_prompt
assert "<source_aware_sast>" in whitebox_llm.system_prompt
assert "Begin with fast source triage" in whitebox_llm.system_prompt
assert "You MUST begin at the very first step by running the code and testing live." not in (
whitebox_llm.system_prompt
@@ -26,3 +27,4 @@ def test_whitebox_prompt_loads_source_aware_coordination_skill(monkeypatch) -> N
non_whitebox_llm = LLM(LLMConfig(scan_mode="quick", is_whitebox=False), agent_name="StrixAgent")
assert "<source_aware_whitebox>" not in non_whitebox_llm.system_prompt
assert "<source_aware_sast>" not in non_whitebox_llm.system_prompt

View File

@@ -58,6 +58,9 @@ def test_create_agent_inherits_parent_whitebox_flag(monkeypatch) -> None:
assert llm_config.timeout == 123
assert llm_config.scan_mode == "standard"
assert llm_config.is_whitebox is True
child_task = captured_config["agent_config"]["state"].task
assert "White-box execution guidance (recommended when source is available):" in child_task
assert "mandatory" not in child_task.lower()
def test_delegation_prompt_includes_wiki_memory_instruction_in_whitebox(monkeypatch) -> None:
@@ -99,9 +102,197 @@ def test_delegation_prompt_includes_wiki_memory_instruction_in_whitebox(monkeypa
state = FakeState()
agent = FakeAgent()
agents_graph_actions._agent_instances[child_id] = agent
result = agents_graph_actions._run_agent_in_thread(agent, state, inherited_messages=[])
assert result["result"] == {"ok": True}
task_messages = [msg for role, msg in state.messages if role == "user"]
assert task_messages
assert 'list_notes(category="wiki")' in task_messages[-1]
assert "get_note(note_id=...)" in task_messages[-1]
assert "Before agent_finish" in task_messages[-1]
def test_agent_finish_appends_wiki_update_for_whitebox(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
agents_graph_actions._agent_graph["nodes"].clear()
agents_graph_actions._agent_graph["edges"].clear()
agents_graph_actions._agent_messages.clear()
agents_graph_actions._running_agents.clear()
agents_graph_actions._agent_instances.clear()
agents_graph_actions._agent_states.clear()
parent_id = "parent-2"
child_id = "child-2"
agents_graph_actions._agent_graph["nodes"][parent_id] = {
"name": "Parent",
"task": "parent task",
"status": "running",
"parent_id": None,
}
agents_graph_actions._agent_graph["nodes"][child_id] = {
"name": "Child",
"task": "child task",
"status": "running",
"parent_id": parent_id,
}
agents_graph_actions._agent_instances[child_id] = SimpleNamespace(
llm_config=LLMConfig(is_whitebox=True)
)
captured: dict[str, str] = {}
def fake_list_notes(category=None):
assert category == "wiki"
return {
"success": True,
"notes": [{"note_id": "wiki-note-1", "content": "Existing wiki content"}],
"total_count": 1,
}
captured_get: dict[str, str] = {}
def fake_get_note(note_id: str):
captured_get["note_id"] = note_id
return {
"success": True,
"note": {
"note_id": note_id,
"title": "Repo Wiki",
"content": "Existing wiki content",
},
}
def fake_update_note(note_id: str, content: str):
captured["note_id"] = note_id
captured["content"] = content
return {"success": True, "note_id": note_id}
monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
monkeypatch.setattr("strix.tools.notes.notes_actions.update_note", fake_update_note)
state = SimpleNamespace(agent_id=child_id, parent_id=parent_id)
result = agents_graph_actions.agent_finish(
agent_state=state,
result_summary="AST pass completed",
findings=["Found route sink candidate"],
success=True,
final_recommendations=["Validate sink with dynamic PoC"],
)
assert result["agent_completed"] is True
assert captured_get["note_id"] == "wiki-note-1"
assert captured["note_id"] == "wiki-note-1"
assert "Agent Update: Child" in captured["content"]
assert "AST pass completed" in captured["content"]
def test_run_agent_in_thread_injects_shared_wiki_context_in_whitebox(monkeypatch) -> None:
monkeypatch.setenv("STRIX_LLM", "openai/gpt-5")
agents_graph_actions._agent_graph["nodes"].clear()
agents_graph_actions._agent_graph["edges"].clear()
agents_graph_actions._agent_messages.clear()
agents_graph_actions._running_agents.clear()
agents_graph_actions._agent_instances.clear()
agents_graph_actions._agent_states.clear()
parent_id = "parent-3"
child_id = "child-3"
agents_graph_actions._agent_graph["nodes"][parent_id] = {"name": "Parent", "status": "running"}
agents_graph_actions._agent_graph["nodes"][child_id] = {"name": "Child", "status": "running"}
class FakeState:
def __init__(self) -> None:
self.agent_id = child_id
self.agent_name = "Child"
self.parent_id = parent_id
self.task = "map source"
self.stop_requested = False
self.messages: list[tuple[str, str]] = []
def add_message(self, role: str, content: str) -> None:
self.messages.append((role, content))
def model_dump(self) -> dict[str, str]:
return {"agent_id": self.agent_id}
class FakeAgent:
def __init__(self) -> None:
self.llm_config = LLMConfig(is_whitebox=True)
async def agent_loop(self, _task: str) -> dict[str, bool]:
return {"ok": True}
captured_get: dict[str, str] = {}
def fake_list_notes(category=None):
assert category == "wiki"
return {
"success": True,
"notes": [{"note_id": "wiki-ctx-1"}],
"total_count": 1,
}
def fake_get_note(note_id: str):
captured_get["note_id"] = note_id
return {
"success": True,
"note": {
"note_id": note_id,
"title": "Shared Repo Wiki",
"content": "Architecture: server/client split",
},
}
monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
state = FakeState()
agent = FakeAgent()
agents_graph_actions._agent_instances[child_id] = agent
result = agents_graph_actions._run_agent_in_thread(agent, state, inherited_messages=[])
assert result["result"] == {"ok": True}
assert captured_get["note_id"] == "wiki-ctx-1"
user_messages = [content for role, content in state.messages if role == "user"]
assert user_messages
assert "<shared_repo_wiki" in user_messages[0]
assert "Architecture: server/client split" in user_messages[0]
def test_load_primary_wiki_note_prefers_repo_tag_match(monkeypatch) -> None:
selected_note_ids: list[str] = []
def fake_list_notes(category=None):
assert category == "wiki"
return {
"success": True,
"notes": [
{"note_id": "wiki-other", "tags": ["repo:other"]},
{"note_id": "wiki-target", "tags": ["repo:appsmith"]},
],
"total_count": 2,
}
def fake_get_note(note_id: str):
selected_note_ids.append(note_id)
return {
"success": True,
"note": {"note_id": note_id, "title": "Repo Wiki", "content": "content"},
}
monkeypatch.setattr("strix.tools.notes.notes_actions.list_notes", fake_list_notes)
monkeypatch.setattr("strix.tools.notes.notes_actions.get_note", fake_get_note)
agent_state = SimpleNamespace(
task="analyze /workspace/appsmith",
context={"whitebox_repo_tags": ["repo:appsmith"]},
)
note = agents_graph_actions._load_primary_wiki_note(agent_state)
assert note is not None
assert note["note_id"] == "wiki-target"
assert selected_note_ids == ["wiki-target"]

View File

@@ -78,6 +78,8 @@ def test_notes_jsonl_replay_survives_memory_reset(tmp_path: Path, monkeypatch) -
assert listed["success"] is True
assert listed["total_count"] == 1
assert listed["notes"][0]["note_id"] == note_id
assert "content" not in listed["notes"][0]
assert "content_preview" in listed["notes"][0]
updated = notes_actions.update_note(note_id=note_id, content="updated finding")
assert updated["success"] is True
@@ -87,6 +89,15 @@ def test_notes_jsonl_replay_survives_memory_reset(tmp_path: Path, monkeypatch) -
assert listed_after_update["success"] is True
assert listed_after_update["total_count"] == 1
assert listed_after_update["notes"][0]["note_id"] == note_id
assert listed_after_update["notes"][0]["content_preview"] == "updated finding"
listed_with_content = notes_actions.list_notes(
category="findings",
include_content=True,
)
assert listed_with_content["success"] is True
assert listed_with_content["total_count"] == 1
assert listed_with_content["notes"][0]["content"] == "updated finding"
deleted = notes_actions.delete_note(note_id=note_id)
assert deleted["success"] is True
@@ -98,3 +109,31 @@ def test_notes_jsonl_replay_survives_memory_reset(tmp_path: Path, monkeypatch) -
finally:
_reset_notes_state()
set_global_tracer(previous_tracer) # type: ignore[arg-type]
def test_get_note_returns_full_note(tmp_path: Path, monkeypatch) -> None:
monkeypatch.chdir(tmp_path)
_reset_notes_state()
previous_tracer = get_global_tracer()
tracer = Tracer("get-note-run")
set_global_tracer(tracer)
try:
created = notes_actions.create_note(
title="Repo wiki",
content="entrypoints and sinks",
category="wiki",
tags=["repo:appsmith"],
)
assert created["success"] is True
note_id = created["note_id"]
assert isinstance(note_id, str)
result = notes_actions.get_note(note_id=note_id)
assert result["success"] is True
assert result["note"]["note_id"] == note_id
assert result["note"]["content"] == "entrypoints and sinks"
finally:
_reset_notes_state()
set_global_tracer(previous_tracer) # type: ignore[arg-type]