Add PostHog integration for analytics and error debugging

This commit is contained in:
0xallam
2026-01-09 12:26:56 -08:00
parent bcd6b8a715
commit 94bb97143e
7 changed files with 237 additions and 14 deletions

View File

@@ -316,16 +316,22 @@ class BaseAgent(metaclass=AgentMeta):
if not sandbox_mode and self.state.sandbox_id is None:
from strix.runtime import get_runtime
runtime = get_runtime()
sandbox_info = await runtime.create_sandbox(
self.state.agent_id, self.state.sandbox_token, self.local_sources
)
self.state.sandbox_id = sandbox_info["workspace_id"]
self.state.sandbox_token = sandbox_info["auth_token"]
self.state.sandbox_info = sandbox_info
try:
runtime = get_runtime()
sandbox_info = await runtime.create_sandbox(
self.state.agent_id, self.state.sandbox_token, self.local_sources
)
self.state.sandbox_id = sandbox_info["workspace_id"]
self.state.sandbox_token = sandbox_info["auth_token"]
self.state.sandbox_info = sandbox_info
if "agent_id" in sandbox_info:
self.state.sandbox_info["agent_id"] = sandbox_info["agent_id"]
if "agent_id" in sandbox_info:
self.state.sandbox_info["agent_id"] = sandbox_info["agent_id"]
except Exception as e:
from strix.telemetry import posthog
posthog.error("sandbox_init_error", str(e))
raise
if not self.state.task:
self.state.task = task

View File

@@ -34,6 +34,7 @@ from strix.interface.utils import (
validate_llm_response,
)
from strix.runtime.docker_runtime import HOST_GATEWAY_HOSTNAME, STRIX_IMAGE
from strix.telemetry import posthog
from strix.telemetry.tracer import get_global_tracer
@@ -514,10 +515,32 @@ def main() -> None:
args.local_sources = collect_local_sources(args.targets_info)
if args.non_interactive:
asyncio.run(run_cli(args))
else:
asyncio.run(run_tui(args))
is_whitebox = bool(args.local_sources)
posthog.start(
model=os.getenv("STRIX_LLM"),
scan_mode=args.scan_mode,
is_whitebox=is_whitebox,
interactive=not args.non_interactive,
has_instructions=bool(args.instruction),
)
exit_reason = "user_exit"
try:
if args.non_interactive:
asyncio.run(run_cli(args))
else:
asyncio.run(run_tui(args))
except KeyboardInterrupt:
exit_reason = "interrupted"
except Exception as e:
exit_reason = "error"
posthog.error("unhandled_exception", str(e))
raise
finally:
tracer = get_global_tracer()
if tracer:
posthog.end(tracer, exit_reason=exit_reason)
results_path = Path("strix_runs") / args.run_name
display_completion_message(args, results_path)

View File

@@ -320,9 +320,15 @@ class LLM:
(litellm.APIError, "API error"),
(litellm.OpenAIError, "OpenAI error"),
]
from strix.telemetry import posthog
for error_type, message in error_map:
if isinstance(e, error_type):
posthog.error(f"llm_{error_type.__name__}", message)
raise LLMRequestFailedError(f"LLM request failed: {message}", str(e)) from e
posthog.error("llm_unknown_error", type(e).__name__)
raise LLMRequestFailedError(f"LLM request failed: {type(e).__name__}", str(e)) from e
async def generate(

42
strix/telemetry/README.md Normal file
View File

@@ -0,0 +1,42 @@
---
title: "Telemetry"
---
### Overview
To help make Strix better for everyone, we collect anonymized data that helps us understand how to better improve our AI security agent for our users, guide the addition of new features, and fix common errors and bugs. This feedback loop is crucial for improving Strix's capabilities and user experience.
We use [PostHog](https://posthog.com), an open-source analytics platform, for data collection and analysis. Our telemetry implementation is fully transparent - you can review the [source code](https://github.com/strix-ai/strix/blob/main/strix/telemetry/posthog.py) to see exactly what we track.
### Telemetry Policy
Privacy is our priority. All collected data is anonymized by default. Each session gets a random UUID that is not persisted or tied to you. Your code, scan targets, vulnerability details, and findings always remain private and are never collected.
### What We Track
We collect only very **basic** usage data including:
**Session Errors:** Duration and error types (not messages or stack traces)\
**System Context:** OS type, architecture, Strix version\
**Scan Context:** Scan mode (quick/standard/deep), scan type (whitebox/blackbox)\
**Model Usage:** Which LLM model is being used (not prompts or responses)\
**Aggregate Metrics:** Vulnerability counts by severity, agent/tool counts, token usage and cost estimates
For complete transparency, you can inspect our [telemetry implementation](https://github.com/strix-ai/strix/blob/main/strix/telemetry/posthog.py) to see the exact events we track.
### What We **Never** Collect
- IP addresses, usernames, or any identifying information
- Scan targets, file paths, target URLs, or domains
- Vulnerability details, descriptions, or code
- LLM requests and responses
### How to Opt Out
Telemetry in Strix is entirely **optional**:
```bash
export STRIX_TELEMETRY=0
```
You can set this environment variable before running Strix to disable **all** telemetry.

View File

@@ -1,4 +1,10 @@
from . import posthog
from .tracer import Tracer, get_global_tracer, set_global_tracer
__all__ = ["Tracer", "get_global_tracer", "set_global_tracer"]
__all__ = [
"Tracer",
"get_global_tracer",
"posthog",
"set_global_tracer",
]

136
strix/telemetry/posthog.py Normal file
View File

@@ -0,0 +1,136 @@
import json
import os
import platform
import sys
import urllib.request
from pathlib import Path
from typing import TYPE_CHECKING, Any
from uuid import uuid4
if TYPE_CHECKING:
from strix.telemetry.tracer import Tracer
_POSTHOG_PUBLIC_API_KEY = "phc_7rO3XRuNT5sgSKAl6HDIrWdSGh1COzxw0vxVIAR6vVZ"
_POSTHOG_HOST = "https://us.i.posthog.com"
_SESSION_ID = uuid4().hex[:16]
def _is_enabled() -> bool:
return os.getenv("STRIX_TELEMETRY", "1").lower() not in ("0", "false", "no", "off")
def _is_first_run() -> bool:
marker = Path.home() / ".strix" / ".seen"
if marker.exists():
return False
try:
marker.parent.mkdir(parents=True, exist_ok=True)
marker.touch()
except Exception: # noqa: BLE001, S110
pass # nosec B110
return True
def _get_version() -> str:
try:
from importlib.metadata import version
return version("strix-agent")
except Exception: # noqa: BLE001
return "unknown"
def _send(event: str, properties: dict[str, Any]) -> None:
if not _is_enabled():
return
try:
payload = {
"api_key": _POSTHOG_PUBLIC_API_KEY,
"event": event,
"distinct_id": _SESSION_ID,
"properties": properties,
}
req = urllib.request.Request( # noqa: S310
f"{_POSTHOG_HOST}/capture/",
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=10): # noqa: S310 # nosec B310
pass
except Exception: # noqa: BLE001, S110
pass # nosec B110
def _base_props() -> dict[str, Any]:
return {
"os": platform.system().lower(),
"arch": platform.machine(),
"python": f"{sys.version_info.major}.{sys.version_info.minor}",
"strix_version": _get_version(),
}
def start(
model: str | None,
scan_mode: str | None,
is_whitebox: bool,
interactive: bool,
has_instructions: bool,
) -> None:
_send(
"scan_started",
{
**_base_props(),
"model": model or "unknown",
"scan_mode": scan_mode or "unknown",
"scan_type": "whitebox" if is_whitebox else "blackbox",
"interactive": interactive,
"has_instructions": has_instructions,
"first_run": _is_first_run(),
},
)
def finding(severity: str) -> None:
_send(
"finding_reported",
{
**_base_props(),
"severity": severity.lower(),
},
)
def end(tracer: "Tracer", exit_reason: str = "completed") -> None:
vulnerabilities_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0}
for v in tracer.vulnerability_reports:
sev = v.get("severity", "info").lower()
if sev in vulnerabilities_counts:
vulnerabilities_counts[sev] += 1
llm = tracer.get_total_llm_stats()
total = llm.get("total", {})
_send(
"scan_ended",
{
**_base_props(),
"exit_reason": exit_reason,
"duration_seconds": round(tracer._calculate_duration()),
"vulnerabilities_total": len(tracer.vulnerability_reports),
**{f"vulnerabilities_{k}": v for k, v in vulnerabilities_counts.items()},
"agent_count": len(tracer.agents),
"tool_count": tracer.get_real_tool_count(),
"llm_tokens": llm.get("total_tokens", 0),
"llm_cost": total.get("cost", 0.0),
},
)
def error(error_type: str, error_msg: str | None = None) -> None:
props = {**_base_props(), "error_type": error_type}
if error_msg:
props["error_msg"] = error_msg
_send("error", props)

View File

@@ -4,6 +4,8 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional
from uuid import uuid4
from strix.telemetry import posthog
if TYPE_CHECKING:
from collections.abc import Callable
@@ -136,6 +138,7 @@ class Tracer:
self.vulnerability_reports.append(report)
logger.info(f"Added vulnerability report: {report_id} - {title}")
posthog.finding(severity)
if self.vulnerability_found_callback:
self.vulnerability_found_callback(report)
@@ -181,6 +184,7 @@ class Tracer:
logger.info("Updated scan final fields")
self.save_run_data(mark_complete=True)
posthog.end(self, exit_reason="finished_by_tool")
def log_agent_creation(
self, agent_id: str, name: str, task: str, parent_id: str | None = None