Files
strix/strix/interface/tui.py
2025-12-03 18:45:01 +00:00

1275 lines
44 KiB
Python

import argparse
import asyncio
import atexit
import logging
import random
import signal
import sys
import threading
from collections.abc import Callable
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as pkg_version
from typing import TYPE_CHECKING, Any, ClassVar, cast
if TYPE_CHECKING:
from textual.timer import Timer
from rich.align import Align
from rich.console import Group
from rich.markup import escape as rich_escape
from rich.panel import Panel
from rich.style import Style
from rich.text import Text
from textual import events, on
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Grid, Horizontal, Vertical, VerticalScroll
from textual.reactive import reactive
from textual.screen import ModalScreen
from textual.widgets import Button, Label, Static, TextArea, Tree
from textual.widgets.tree import TreeNode
from strix.agents.StrixAgent import StrixAgent
from strix.interface.utils import build_live_stats_text
from strix.llm.config import LLMConfig
from strix.telemetry.tracer import Tracer, set_global_tracer
def escape_markup(text: str) -> str:
return cast("str", rich_escape(text))
def get_package_version() -> str:
try:
return pkg_version("strix-agent")
except PackageNotFoundError:
return "dev"
class ChatTextArea(TextArea): # type: ignore[misc]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._app_reference: StrixTUIApp | None = None
def set_app_reference(self, app: "StrixTUIApp") -> None:
self._app_reference = app
def _on_key(self, event: events.Key) -> None:
if event.key == "enter" and self._app_reference:
text_content = str(self.text) # type: ignore[has-type]
message = text_content.strip()
if message:
self.text = ""
self._app_reference._send_user_message(message)
event.prevent_default()
return
super()._on_key(event)
class SplashScreen(Static): # type: ignore[misc]
PRIMARY_GREEN = "#22c55e"
BANNER = (
" ███████╗████████╗██████╗ ██╗██╗ ██╗\n"
" ██╔════╝╚══██╔══╝██╔══██╗██║╚██╗██╔╝\n"
" ███████╗ ██║ ██████╔╝██║ ╚███╔╝\n"
" ╚════██║ ██║ ██╔══██╗██║ ██╔██╗\n"
" ███████║ ██║ ██║ ██║██║██╔╝ ██╗\n"
" ╚══════╝ ╚═╝ ╚═╝ ╚═╝╚═╝╚═╝ ╚═╝"
)
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._animation_step = 0
self._animation_timer: Timer | None = None
self._panel_static: Static | None = None
self._version = "dev"
def compose(self) -> ComposeResult:
self._version = get_package_version()
self._animation_step = 0
start_line = self._build_start_line_text(self._animation_step)
panel = self._build_panel(start_line)
panel_static = Static(panel, id="splash_content")
self._panel_static = panel_static
yield panel_static
def on_mount(self) -> None:
self._animation_timer = self.set_interval(0.45, self._animate_start_line)
def on_unmount(self) -> None:
if self._animation_timer is not None:
self._animation_timer.stop()
self._animation_timer = None
def _animate_start_line(self) -> None:
if not self._panel_static:
return
self._animation_step += 1
start_line = self._build_start_line_text(self._animation_step)
panel = self._build_panel(start_line)
self._panel_static.update(panel)
def _build_panel(self, start_line: Text) -> Panel:
content = Group(
Align.center(Text(self.BANNER.strip("\n"), style=self.PRIMARY_GREEN, justify="center")),
Align.center(Text(" ")),
Align.center(self._build_welcome_text()),
Align.center(self._build_version_text()),
Align.center(self._build_tagline_text()),
Align.center(Text(" ")),
Align.center(start_line.copy()),
)
return Panel.fit(content, border_style=self.PRIMARY_GREEN, padding=(1, 6))
def _build_welcome_text(self) -> Text:
text = Text("Welcome to ", style=Style(color="white", bold=True))
text.append("Strix", style=Style(color=self.PRIMARY_GREEN, bold=True))
text.append("!", style=Style(color="white", bold=True))
return text
def _build_version_text(self) -> Text:
return Text(f"v{self._version}", style=Style(color="white", dim=True))
def _build_tagline_text(self) -> Text:
return Text("Open-source AI hackers for your apps", style=Style(color="white", dim=True))
def _build_start_line_text(self, phase: int) -> Text:
emphasize = phase % 2 == 1
base_style = Style(color="white", dim=not emphasize, bold=emphasize)
strix_style = Style(color=self.PRIMARY_GREEN, bold=bool(emphasize))
text = Text("Starting ", style=base_style)
text.append("Strix", style=strix_style)
text.append(" Cybersecurity Agent", style=base_style)
return text
class HelpScreen(ModalScreen): # type: ignore[misc]
def compose(self) -> ComposeResult:
yield Grid(
Label("🦉 Strix Help", id="help_title"),
Label(
"F1 Help\nCtrl+Q/C Quit\nESC Stop Agent\n"
"Enter Send message to agent\nTab Switch panels\n↑/↓ Navigate tree",
id="help_content",
),
id="dialog",
)
def on_key(self, _event: events.Key) -> None:
self.app.pop_screen()
class StopAgentScreen(ModalScreen): # type: ignore[misc]
def __init__(self, agent_name: str, agent_id: str):
super().__init__()
self.agent_name = agent_name
self.agent_id = agent_id
def compose(self) -> ComposeResult:
yield Grid(
Label(f"🛑 Stop '{self.agent_name}'?", id="stop_agent_title"),
Grid(
Button("Yes", variant="error", id="stop_agent"),
Button("No", variant="default", id="cancel_stop"),
id="stop_agent_buttons",
),
id="stop_agent_dialog",
)
def on_mount(self) -> None:
cancel_button = self.query_one("#cancel_stop", Button)
cancel_button.focus()
def on_key(self, event: events.Key) -> None:
if event.key in ("left", "right", "up", "down"):
focused = self.focused
if focused and focused.id == "stop_agent":
cancel_button = self.query_one("#cancel_stop", Button)
cancel_button.focus()
else:
stop_button = self.query_one("#stop_agent", Button)
stop_button.focus()
event.prevent_default()
elif event.key == "enter":
focused = self.focused
if focused and isinstance(focused, Button):
focused.press()
event.prevent_default()
elif event.key == "escape":
self.app.pop_screen()
event.prevent_default()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "stop_agent":
self.app.action_confirm_stop_agent(self.agent_id)
else:
self.app.pop_screen()
class QuitScreen(ModalScreen): # type: ignore[misc]
def compose(self) -> ComposeResult:
yield Grid(
Label("🦉 Quit Strix? ", id="quit_title"),
Grid(
Button("Yes", variant="error", id="quit"),
Button("No", variant="default", id="cancel"),
id="quit_buttons",
),
id="quit_dialog",
)
def on_mount(self) -> None:
cancel_button = self.query_one("#cancel", Button)
cancel_button.focus()
def on_key(self, event: events.Key) -> None:
if event.key in ("left", "right", "up", "down"):
focused = self.focused
if focused and focused.id == "quit":
cancel_button = self.query_one("#cancel", Button)
cancel_button.focus()
else:
quit_button = self.query_one("#quit", Button)
quit_button.focus()
event.prevent_default()
elif event.key == "enter":
focused = self.focused
if focused and isinstance(focused, Button):
focused.press()
event.prevent_default()
elif event.key == "escape":
self.app.pop_screen()
event.prevent_default()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "quit":
self.app.action_custom_quit()
else:
self.app.pop_screen()
class StrixTUIApp(App): # type: ignore[misc]
CSS_PATH = "assets/tui_styles.tcss"
selected_agent_id: reactive[str | None] = reactive(default=None)
show_splash: reactive[bool] = reactive(default=True)
BINDINGS: ClassVar[list[Binding]] = [
Binding("f1", "toggle_help", "Help", priority=True),
Binding("ctrl+q", "request_quit", "Quit", priority=True),
Binding("ctrl+c", "request_quit", "Quit", priority=True),
Binding("escape", "stop_selected_agent", "Stop Agent", priority=True),
]
def __init__(self, args: argparse.Namespace):
super().__init__()
self.args = args
self.scan_config = self._build_scan_config(args)
self.agent_config = self._build_agent_config(args)
self.tracer = Tracer(self.scan_config["run_name"])
self.tracer.set_scan_config(self.scan_config)
set_global_tracer(self.tracer)
self.agent_nodes: dict[str, TreeNode] = {}
self._displayed_agents: set[str] = set()
self._displayed_events: list[str] = []
self._scan_thread: threading.Thread | None = None
self._scan_stop_event = threading.Event()
self._scan_completed = threading.Event()
self._action_verbs = [
"Generating",
"Scanning",
"Analyzing",
"Probing",
"Hacking",
"Testing",
"Exploiting",
"Investigating",
]
self._agent_verbs: dict[str, str] = {} # agent_id -> current_verb
self._agent_verb_timers: dict[str, Any] = {} # agent_id -> timer
self._agent_dot_states: dict[str, int] = {} # agent_id -> dot_count (0-3)
self._dot_animation_timer: Any | None = None
self._setup_cleanup_handlers()
def _build_scan_config(self, args: argparse.Namespace) -> dict[str, Any]:
return {
"scan_id": args.run_name,
"targets": args.targets_info,
"user_instructions": args.instruction or "",
"run_name": args.run_name,
}
def _build_agent_config(self, args: argparse.Namespace) -> dict[str, Any]:
llm_config = LLMConfig()
config = {
"llm_config": llm_config,
"max_iterations": 300,
}
if getattr(args, "local_sources", None):
config["local_sources"] = args.local_sources
return config
def _setup_cleanup_handlers(self) -> None:
def cleanup_on_exit() -> None:
self.tracer.cleanup()
def signal_handler(_signum: int, _frame: Any) -> None:
self.tracer.cleanup()
sys.exit(0)
atexit.register(cleanup_on_exit)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, signal_handler)
def compose(self) -> ComposeResult:
if self.show_splash:
yield SplashScreen(id="splash_screen")
def watch_show_splash(self, show_splash: bool) -> None:
if not show_splash and self.is_mounted:
try:
splash = self.query_one("#splash_screen")
splash.remove()
except ValueError:
pass
main_container = Vertical(id="main_container")
self.mount(main_container)
content_container = Horizontal(id="content_container")
main_container.mount(content_container)
chat_area_container = Vertical(id="chat_area_container")
chat_display = Static("", id="chat_display")
chat_history = VerticalScroll(chat_display, id="chat_history")
chat_history.can_focus = True
status_text = Static("", id="status_text")
keymap_indicator = Static("", id="keymap_indicator")
agent_status_display = Horizontal(
status_text, keymap_indicator, id="agent_status_display", classes="hidden"
)
chat_prompt = Static("> ", id="chat_prompt")
chat_input = ChatTextArea(
"",
id="chat_input",
show_line_numbers=False,
)
chat_input.set_app_reference(self)
chat_input_container = Horizontal(chat_prompt, chat_input, id="chat_input_container")
agents_tree = Tree("🤖 Active Agents", id="agents_tree")
agents_tree.root.expand()
agents_tree.show_root = False
agents_tree.show_guide = True
agents_tree.guide_depth = 3
agents_tree.guide_style = "dashed"
stats_display = Static("", id="stats_display")
sidebar = Vertical(agents_tree, stats_display, id="sidebar")
content_container.mount(chat_area_container)
content_container.mount(sidebar)
chat_area_container.mount(chat_history)
chat_area_container.mount(agent_status_display)
chat_area_container.mount(chat_input_container)
self.call_after_refresh(self._focus_chat_input)
def _focus_chat_input(self) -> None:
if len(self.screen_stack) > 1 or self.show_splash:
return
if not self.is_mounted:
return
try:
chat_input = self.query_one("#chat_input", ChatTextArea)
chat_input.show_vertical_scrollbar = False
chat_input.show_horizontal_scrollbar = False
chat_input.focus()
except (ValueError, Exception):
self.call_after_refresh(self._focus_chat_input)
def _focus_agents_tree(self) -> None:
if len(self.screen_stack) > 1 or self.show_splash:
return
if not self.is_mounted:
return
try:
agents_tree = self.query_one("#agents_tree", Tree)
agents_tree.focus()
if agents_tree.root.children:
first_node = agents_tree.root.children[0]
agents_tree.select_node(first_node)
except (ValueError, Exception):
self.call_after_refresh(self._focus_agents_tree)
def on_mount(self) -> None:
self.title = "strix"
self.set_timer(4.5, self._hide_splash_screen)
def _hide_splash_screen(self) -> None:
self.show_splash = False
self._start_scan_thread()
self.set_interval(0.5, self._update_ui_from_tracer)
def _update_ui_from_tracer(self) -> None:
if self.show_splash:
return
if len(self.screen_stack) > 1:
return
if not self.is_mounted:
return
try:
chat_history = self.query_one("#chat_history", VerticalScroll)
agents_tree = self.query_one("#agents_tree", Tree)
if not self._is_widget_safe(chat_history) or not self._is_widget_safe(agents_tree):
return
except (ValueError, Exception):
return
agent_updates = False
for agent_id, agent_data in self.tracer.agents.items():
if agent_id not in self._displayed_agents:
self._add_agent_node(agent_data)
self._displayed_agents.add(agent_id)
agent_updates = True
elif self._update_agent_node(agent_id, agent_data):
agent_updates = True
if agent_updates:
self._expand_all_agent_nodes()
self._update_chat_view()
self._update_agent_status_display()
self._update_stats_display()
def _update_agent_node(self, agent_id: str, agent_data: dict[str, Any]) -> bool:
if agent_id not in self.agent_nodes:
return False
try:
agent_node = self.agent_nodes[agent_id]
agent_name_raw = agent_data.get("name", "Agent")
status = agent_data.get("status", "running")
status_indicators = {
"running": "🟢",
"waiting": "⏸️",
"completed": "",
"failed": "",
"stopped": "⏹️",
"stopping": "⏸️",
"llm_failed": "🔴",
}
status_icon = status_indicators.get(status, "🔵")
agent_name = f"{status_icon} {escape_markup(agent_name_raw)}"
if status == "running":
self._start_agent_verb_timer(agent_id)
elif status == "waiting":
self._stop_agent_verb_timer(agent_id)
else:
self._stop_agent_verb_timer(agent_id)
if agent_node.label != agent_name:
agent_node.set_label(agent_name)
return True
except (KeyError, AttributeError, ValueError) as e:
import logging
logging.warning(f"Failed to update agent node label: {e}")
return False
def _update_chat_view(self) -> None:
if len(self.screen_stack) > 1 or self.show_splash:
return
if not self.is_mounted:
return
try:
chat_history = self.query_one("#chat_history", VerticalScroll)
except (ValueError, Exception):
return
if not self._is_widget_safe(chat_history):
return
try:
is_at_bottom = chat_history.scroll_y >= chat_history.max_scroll_y
except (AttributeError, ValueError):
is_at_bottom = True
if not self.selected_agent_id:
content, css_class = self._get_chat_placeholder_content(
"Select an agent from the tree to see its activity.", "placeholder-no-agent"
)
else:
events = self._gather_agent_events(self.selected_agent_id)
if not events:
content, css_class = self._get_chat_placeholder_content(
"Starting agent...", "placeholder-no-activity"
)
else:
current_event_ids = [e["id"] for e in events]
if current_event_ids == self._displayed_events:
return
content = self._get_rendered_events_content(events)
css_class = "chat-content"
self._displayed_events = current_event_ids
chat_display = self.query_one("#chat_display", Static)
self._update_static_content_safe(chat_display, content)
chat_display.set_classes(css_class)
if is_at_bottom:
self.call_later(chat_history.scroll_end, animate=False)
def _get_chat_placeholder_content(
self, message: str, placeholder_class: str
) -> tuple[str, str]:
self._displayed_events = [placeholder_class]
return message, f"chat-placeholder {placeholder_class}"
def _get_rendered_events_content(self, events: list[dict[str, Any]]) -> str:
if not events:
return ""
content_lines = []
for event in events:
if event["type"] == "chat":
chat_content = self._render_chat_content(event["data"])
if chat_content:
content_lines.append(chat_content)
elif event["type"] == "tool":
tool_content = self._render_tool_content_simple(event["data"])
if tool_content:
content_lines.append(tool_content)
return "\n\n".join(content_lines)
def _update_agent_status_display(self) -> None:
try:
status_display = self.query_one("#agent_status_display", Horizontal)
status_text = self.query_one("#status_text", Static)
keymap_indicator = self.query_one("#keymap_indicator", Static)
except (ValueError, Exception):
return
widgets = [status_display, status_text, keymap_indicator]
if not all(self._is_widget_safe(w) for w in widgets):
return
if not self.selected_agent_id:
self._safe_widget_operation(status_display.add_class, "hidden")
return
try:
agent_data = self.tracer.agents[self.selected_agent_id]
status = agent_data.get("status", "running")
if status == "stopping":
self._safe_widget_operation(status_text.update, "Agent stopping...")
self._safe_widget_operation(keymap_indicator.update, "")
self._safe_widget_operation(status_display.remove_class, "hidden")
elif status == "stopped":
self._safe_widget_operation(status_text.update, "Agent stopped")
self._safe_widget_operation(keymap_indicator.update, "")
self._safe_widget_operation(status_display.remove_class, "hidden")
elif status == "completed":
self._safe_widget_operation(status_text.update, "Agent completed")
self._safe_widget_operation(keymap_indicator.update, "")
self._safe_widget_operation(status_display.remove_class, "hidden")
elif status == "llm_failed":
error_msg = agent_data.get("error_message", "")
display_msg = (
f"[red]{escape_markup(error_msg)}[/red]"
if error_msg
else "[red]LLM request failed[/red]"
)
self._safe_widget_operation(status_text.update, display_msg)
self._safe_widget_operation(
keymap_indicator.update, "[dim]Send message to retry[/dim]"
)
self._safe_widget_operation(status_display.remove_class, "hidden")
self._stop_dot_animation()
elif status == "waiting":
animated_text = self._get_animated_waiting_text(self.selected_agent_id)
self._safe_widget_operation(status_text.update, animated_text)
self._safe_widget_operation(
keymap_indicator.update, "[dim]Send message to resume[/dim]"
)
self._safe_widget_operation(status_display.remove_class, "hidden")
self._start_dot_animation()
elif status == "running":
current_verb = self._get_agent_verb(self.selected_agent_id)
animated_text = self._get_animated_verb_text(self.selected_agent_id, current_verb)
self._safe_widget_operation(status_text.update, animated_text)
self._safe_widget_operation(
keymap_indicator.update, "[dim]ESC to stop | CTRL-C to quit and save[/dim]"
)
self._safe_widget_operation(status_display.remove_class, "hidden")
self._start_dot_animation()
else:
self._safe_widget_operation(status_display.add_class, "hidden")
except (KeyError, Exception):
self._safe_widget_operation(status_display.add_class, "hidden")
def _update_stats_display(self) -> None:
try:
stats_display = self.query_one("#stats_display", Static)
except (ValueError, Exception):
return
if not self._is_widget_safe(stats_display):
return
stats_content = Text()
stats_text = build_live_stats_text(self.tracer, self.agent_config)
if stats_text:
stats_content.append(stats_text)
from rich.panel import Panel
stats_panel = Panel(
stats_content,
title="📊 Live Stats",
title_align="left",
border_style="#22c55e",
padding=(0, 1),
)
self._safe_widget_operation(stats_display.update, stats_panel)
def _get_agent_verb(self, agent_id: str) -> str:
if agent_id not in self._agent_verbs:
self._agent_verbs[agent_id] = random.choice(self._action_verbs) # nosec B311 # noqa: S311
return self._agent_verbs[agent_id]
def _start_agent_verb_timer(self, agent_id: str) -> None:
if agent_id not in self._agent_verb_timers:
self._agent_verb_timers[agent_id] = self.set_interval(
30.0, lambda: self._change_agent_action_verb(agent_id)
)
def _stop_agent_verb_timer(self, agent_id: str) -> None:
if agent_id in self._agent_verb_timers:
self._agent_verb_timers[agent_id].stop()
del self._agent_verb_timers[agent_id]
def _change_agent_action_verb(self, agent_id: str) -> None:
if agent_id not in self._agent_verbs:
self._agent_verbs[agent_id] = random.choice(self._action_verbs) # nosec B311 # noqa: S311
return
current_verb = self._agent_verbs[agent_id]
available_verbs = [verb for verb in self._action_verbs if verb != current_verb]
self._agent_verbs[agent_id] = random.choice(available_verbs) # nosec B311 # noqa: S311
if self.selected_agent_id == agent_id:
self._update_agent_status_display()
def _get_animated_verb_text(self, agent_id: str, verb: str) -> str:
if agent_id not in self._agent_dot_states:
self._agent_dot_states[agent_id] = 0
dot_count = self._agent_dot_states[agent_id]
dots = "." * dot_count
return f"{verb}{dots}"
def _get_animated_waiting_text(self, agent_id: str) -> str:
if agent_id not in self._agent_dot_states:
self._agent_dot_states[agent_id] = 0
dot_count = self._agent_dot_states[agent_id]
dots = "." * dot_count
return f"Waiting{dots}"
def _start_dot_animation(self) -> None:
if self._dot_animation_timer is None:
self._dot_animation_timer = self.set_interval(0.6, self._animate_dots)
def _stop_dot_animation(self) -> None:
if self._dot_animation_timer is not None:
self._dot_animation_timer.stop()
self._dot_animation_timer = None
def _animate_dots(self) -> None:
has_active_agents = False
for agent_id, agent_data in self.tracer.agents.items():
status = agent_data.get("status", "running")
if status in ["running", "waiting"]:
has_active_agents = True
current_dots = self._agent_dot_states.get(agent_id, 0)
self._agent_dot_states[agent_id] = (current_dots + 1) % 4
if (
has_active_agents
and self.selected_agent_id
and self.selected_agent_id in self.tracer.agents
):
selected_status = self.tracer.agents[self.selected_agent_id].get("status", "running")
if selected_status in ["running", "waiting"]:
self._update_agent_status_display()
if not has_active_agents:
self._stop_dot_animation()
for agent_id in list(self._agent_dot_states.keys()):
if agent_id not in self.tracer.agents or self.tracer.agents[agent_id].get(
"status"
) not in ["running", "waiting"]:
del self._agent_dot_states[agent_id]
def _gather_agent_events(self, agent_id: str) -> list[dict[str, Any]]:
chat_events = [
{
"type": "chat",
"timestamp": msg["timestamp"],
"id": f"chat_{msg['message_id']}",
"data": msg,
}
for msg in self.tracer.chat_messages
if msg.get("agent_id") == agent_id
]
tool_events = [
{
"type": "tool",
"timestamp": tool_data["timestamp"],
"id": f"tool_{exec_id}",
"data": tool_data,
}
for exec_id, tool_data in self.tracer.tool_executions.items()
if tool_data.get("agent_id") == agent_id
]
events = chat_events + tool_events
events.sort(key=lambda e: (e["timestamp"], e["id"]))
return events
def watch_selected_agent_id(self, _agent_id: str | None) -> None:
if len(self.screen_stack) > 1 or self.show_splash:
return
if not self.is_mounted:
return
self._displayed_events.clear()
self.call_later(self._update_chat_view)
self._update_agent_status_display()
def _start_scan_thread(self) -> None:
def scan_target() -> None:
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
agent = StrixAgent(self.agent_config)
if not self._scan_stop_event.is_set():
loop.run_until_complete(agent.execute_scan(self.scan_config))
except (KeyboardInterrupt, asyncio.CancelledError):
logging.info("Scan interrupted by user")
except (ConnectionError, TimeoutError):
logging.exception("Network error during scan")
except RuntimeError:
logging.exception("Runtime error during scan")
except Exception:
logging.exception("Unexpected error during scan")
finally:
loop.close()
self._scan_completed.set()
except Exception:
logging.exception("Error setting up scan thread")
self._scan_completed.set()
self._scan_thread = threading.Thread(target=scan_target, daemon=True)
self._scan_thread.start()
def _add_agent_node(self, agent_data: dict[str, Any]) -> None:
if len(self.screen_stack) > 1 or self.show_splash:
return
if not self.is_mounted:
return
agent_id = agent_data["id"]
parent_id = agent_data.get("parent_id")
status = agent_data.get("status", "running")
try:
agents_tree = self.query_one("#agents_tree", Tree)
except (ValueError, Exception):
return
agent_name_raw = agent_data.get("name", "Agent")
status_indicators = {
"running": "🟢",
"waiting": "🟡",
"completed": "",
"failed": "",
"stopped": "⏹️",
"stopping": "⏸️",
}
status_icon = status_indicators.get(status, "🔵")
agent_name = f"{status_icon} {escape_markup(agent_name_raw)}"
if status in ["running", "waiting"]:
self._start_agent_verb_timer(agent_id)
try:
if parent_id and parent_id in self.agent_nodes:
parent_node = self.agent_nodes[parent_id]
agent_node = parent_node.add(
agent_name,
data={"agent_id": agent_id},
)
parent_node.allow_expand = True
else:
agent_node = agents_tree.root.add(
agent_name,
data={"agent_id": agent_id},
)
agent_node.allow_expand = False
agent_node.expand()
self.agent_nodes[agent_id] = agent_node
if len(self.agent_nodes) == 1:
agents_tree.select_node(agent_node)
self.selected_agent_id = agent_id
self._reorganize_orphaned_agents(agent_id)
except (AttributeError, ValueError, RuntimeError) as e:
import logging
logging.warning(f"Failed to add agent node {agent_id}: {e}")
def _expand_all_agent_nodes(self) -> None:
if len(self.screen_stack) > 1 or self.show_splash:
return
if not self.is_mounted:
return
try:
agents_tree = self.query_one("#agents_tree", Tree)
self._expand_node_recursively(agents_tree.root)
except (ValueError, Exception):
logging.debug("Tree not ready for expanding nodes")
def _expand_node_recursively(self, node: TreeNode) -> None:
if not node.is_expanded:
node.expand()
for child in node.children:
self._expand_node_recursively(child)
def _copy_node_under(self, node_to_copy: TreeNode, new_parent: TreeNode) -> None:
agent_id = node_to_copy.data["agent_id"]
agent_data = self.tracer.agents.get(agent_id, {})
agent_name_raw = agent_data.get("name", "Agent")
status = agent_data.get("status", "running")
status_indicators = {
"running": "🟢",
"waiting": "🟡",
"completed": "",
"failed": "",
"stopped": "⏹️",
"stopping": "⏸️",
}
status_icon = status_indicators.get(status, "🔵")
agent_name = f"{status_icon} {escape_markup(agent_name_raw)}"
new_node = new_parent.add(
agent_name,
data=node_to_copy.data,
)
new_node.allow_expand = node_to_copy.allow_expand
self.agent_nodes[agent_id] = new_node
for child in node_to_copy.children:
self._copy_node_under(child, new_node)
if node_to_copy.is_expanded:
new_node.expand()
def _reorganize_orphaned_agents(self, new_parent_id: str) -> None:
agents_to_move = []
for agent_id, agent_data in self.tracer.agents.items():
if (
agent_data.get("parent_id") == new_parent_id
and agent_id in self.agent_nodes
and agent_id != new_parent_id
):
agents_to_move.append(agent_id)
if not agents_to_move:
return
parent_node = self.agent_nodes[new_parent_id]
for child_agent_id in agents_to_move:
if child_agent_id in self.agent_nodes:
old_node = self.agent_nodes[child_agent_id]
if old_node.parent is parent_node:
continue
self._copy_node_under(old_node, parent_node)
old_node.remove()
parent_node.allow_expand = True
self._expand_all_agent_nodes()
def _render_chat_content(self, msg_data: dict[str, Any]) -> str:
role = msg_data.get("role")
content = escape_markup(msg_data.get("content", ""))
if not content:
return ""
if role == "user":
from strix.interface.tool_components.user_message_renderer import UserMessageRenderer
return UserMessageRenderer.render_simple(content)
return content
def _render_tool_content_simple(self, tool_data: dict[str, Any]) -> str:
tool_name = tool_data.get("tool_name", "Unknown Tool")
args = tool_data.get("args", {})
status = tool_data.get("status", "unknown")
result = tool_data.get("result")
tool_colors = {
"terminal_execute": "#22c55e",
"browser_action": "#06b6d4",
"python_action": "#3b82f6",
"agents_graph_action": "#fbbf24",
"file_edit_action": "#10b981",
"proxy_action": "#06b6d4",
"notes_action": "#fbbf24",
"thinking_action": "#a855f7",
"web_search_action": "#22c55e",
"finish_action": "#dc2626",
"reporting_action": "#ea580c",
"scan_start_info": "#22c55e",
"subagent_start_info": "#22c55e",
"llm_error_details": "#dc2626",
}
color = tool_colors.get(tool_name, "#737373")
from strix.interface.tool_components.registry import get_tool_renderer
renderer = get_tool_renderer(tool_name)
if renderer:
widget = renderer.render(tool_data)
content = str(widget.renderable)
elif tool_name == "llm_error_details":
lines = ["[red]✗ LLM Request Failed[/red]"]
if args.get("details"):
details = args["details"]
if len(details) > 300:
details = details[:297] + "..."
lines.append(f"[dim]Details:[/dim] {escape_markup(details)}")
content = "\n".join(lines)
else:
status_icons = {
"running": "[yellow]●[/yellow]",
"completed": "[green]✓[/green]",
"failed": "[red]✗[/red]",
"error": "[red]✗[/red]",
}
status_icon = status_icons.get(status, "[dim]○[/dim]")
lines = [f"→ Using tool [bold blue]{escape_markup(tool_name)}[/] {status_icon}"]
if args:
for k, v in list(args.items())[:2]:
str_v = str(v)
if len(str_v) > 80:
str_v = str_v[:77] + "..."
lines.append(f" [dim]{k}:[/] {escape_markup(str_v)}")
if status in ["completed", "failed", "error"] and result:
result_str = str(result)
if len(result_str) > 150:
result_str = result_str[:147] + "..."
lines.append(f"[bold]Result:[/] {escape_markup(result_str)}")
content = "\n".join(lines)
lines = content.split("\n")
bordered_lines = [f"[{color}]▍[/{color}] {line}" for line in lines]
return "\n".join(bordered_lines)
@on(Tree.NodeHighlighted) # type: ignore[misc]
def handle_tree_highlight(self, event: Tree.NodeHighlighted) -> None:
if len(self.screen_stack) > 1 or self.show_splash:
return
if not self.is_mounted:
return
node = event.node
try:
agents_tree = self.query_one("#agents_tree", Tree)
except (ValueError, Exception):
return
if self.focused == agents_tree and node.data:
agent_id = node.data.get("agent_id")
if agent_id:
self.selected_agent_id = agent_id
def _send_user_message(self, message: str) -> None:
if not self.selected_agent_id:
return
if self.tracer:
self.tracer.log_chat_message(
content=message,
role="user",
agent_id=self.selected_agent_id,
)
try:
from strix.tools.agents_graph.agents_graph_actions import send_user_message_to_agent
send_user_message_to_agent(self.selected_agent_id, message)
except (ImportError, AttributeError) as e:
import logging
logging.warning(f"Failed to send message to agent {self.selected_agent_id}: {e}")
self._displayed_events.clear()
self._update_chat_view()
self.call_after_refresh(self._focus_chat_input)
def _get_agent_name(self, agent_id: str) -> str:
try:
if self.tracer and agent_id in self.tracer.agents:
agent_name = self.tracer.agents[agent_id].get("name")
if isinstance(agent_name, str):
return agent_name
except (KeyError, AttributeError) as e:
logging.warning(f"Could not retrieve agent name for {agent_id}: {e}")
return "Unknown Agent"
def action_toggle_help(self) -> None:
if self.show_splash or not self.is_mounted:
return
try:
self.query_one("#main_container")
except (ValueError, Exception):
return
if isinstance(self.screen, HelpScreen):
self.pop_screen()
return
if len(self.screen_stack) > 1:
return
self.push_screen(HelpScreen())
def action_request_quit(self) -> None:
if self.show_splash or not self.is_mounted:
self.action_custom_quit()
return
if len(self.screen_stack) > 1:
return
try:
self.query_one("#main_container")
except (ValueError, Exception):
self.action_custom_quit()
return
self.push_screen(QuitScreen())
def action_stop_selected_agent(self) -> None:
if (
self.show_splash
or not self.is_mounted
or len(self.screen_stack) > 1
or not self.selected_agent_id
):
return
agent_name, should_stop = self._validate_agent_for_stopping()
if not should_stop:
return
try:
self.query_one("#main_container")
except (ValueError, Exception):
return
self.push_screen(StopAgentScreen(agent_name, self.selected_agent_id))
def _validate_agent_for_stopping(self) -> tuple[str, bool]:
agent_name = "Unknown Agent"
try:
if self.tracer and self.selected_agent_id in self.tracer.agents:
agent_data = self.tracer.agents[self.selected_agent_id]
agent_name = agent_data.get("name", "Unknown Agent")
agent_status = agent_data.get("status", "running")
if agent_status not in ["running"]:
return agent_name, False
agent_events = self._gather_agent_events(self.selected_agent_id)
if not agent_events:
return agent_name, False
return agent_name, True
except (KeyError, AttributeError, ValueError) as e:
import logging
logging.warning(f"Failed to gather agent events: {e}")
return agent_name, False
def action_confirm_stop_agent(self, agent_id: str) -> None:
self.pop_screen()
try:
from strix.tools.agents_graph.agents_graph_actions import stop_agent
result = stop_agent(agent_id)
import logging
if result.get("success"):
logging.info(f"Stop request sent to agent: {result.get('message', 'Unknown')}")
else:
logging.warning(f"Failed to stop agent: {result.get('error', 'Unknown error')}")
except Exception:
import logging
logging.exception(f"Failed to stop agent {agent_id}")
def action_custom_quit(self) -> None:
for agent_id in list(self._agent_verb_timers.keys()):
self._stop_agent_verb_timer(agent_id)
if self._scan_thread and self._scan_thread.is_alive():
self._scan_stop_event.set()
self._scan_thread.join(timeout=1.0)
self.tracer.cleanup()
self.exit()
def _is_widget_safe(self, widget: Any) -> bool:
try:
_ = widget.screen
except (AttributeError, ValueError, Exception):
return False
else:
return bool(widget.is_mounted)
def _safe_widget_operation(
self, operation: Callable[..., Any], *args: Any, **kwargs: Any
) -> bool:
try:
operation(*args, **kwargs)
except (AttributeError, ValueError, Exception):
return False
else:
return True
def _update_static_content_safe(self, widget: Static, content: str) -> None:
try:
widget.update(content)
except Exception: # noqa: BLE001
try:
safe_text = Text.from_markup(content)
widget.update(safe_text)
except Exception: # noqa: BLE001
import re
plain_text = re.sub(r"\[.*?\]", "", content)
widget.update(plain_text)
async def run_tui(args: argparse.Namespace) -> None:
"""Run strix in interactive TUI mode with textual."""
app = StrixTUIApp(args)
await app.run_async()