From 198a5e4a618c4df6154e9f9bca11580b48a78c04 Mon Sep 17 00:00:00 2001 From: Ahmed Allam <49919286+0xallam@users.noreply.github.com> Date: Sat, 16 Aug 2025 23:43:29 -0700 Subject: [PATCH] Redesigning the terminal tool (#11) --- containers/Dockerfile | 1 + poetry.lock | 14 +- pyproject.toml | 2 + strix/cli/app.py | 2 +- .../cli/tool_components/terminal_renderer.py | 142 ++++--- strix/tools/argument_parser.py | 3 +- strix/tools/terminal/__init__.py | 4 +- strix/tools/terminal/terminal_actions.py | 62 ++- .../terminal/terminal_actions_schema.xml | 189 +++++----- strix/tools/terminal/terminal_instance.py | 231 ------------ strix/tools/terminal/terminal_manager.py | 212 +++++------ strix/tools/terminal/terminal_session.py | 356 ++++++++++++++++++ 12 files changed, 673 insertions(+), 545 deletions(-) delete mode 100644 strix/tools/terminal/terminal_instance.py create mode 100644 strix/tools/terminal/terminal_session.py diff --git a/containers/Dockerfile b/containers/Dockerfile index 5a5234d..b8cbdeb 100644 --- a/containers/Dockerfile +++ b/containers/Dockerfile @@ -38,6 +38,7 @@ RUN apt-get update && \ nodejs npm pipx \ libcap2-bin \ gdb \ + tmux \ libnss3 libnspr4 libdbus-1-3 libatk1.0-0 libatk-bridge2.0-0 libcups2 libdrm2 libatspi2.0-0 \ libxcomposite1 libxdamage1 libxfixes3 libxrandr2 libgbm1 libxkbcommon0 libpango-1.0-0 libcairo2 libasound2 \ fonts-unifont fonts-noto-color-emoji fonts-freefont-ttf fonts-dejavu-core ttf-bitstream-vera \ diff --git a/poetry.lock b/poetry.lock index 1cd171d..e44dfdc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2311,6 +2311,18 @@ pyyaml = ">=5.2" [package.extras] dev = ["Sphinx (>=5.1.1)", "black (==24.8.0)", "build (>=0.10.0)", "coverage[toml] (>=4.5.4)", "fixit (==2.1.0)", "flake8 (==7.1.1)", "hypothesis (>=4.36.0)", "hypothesmith (>=0.0.4)", "jinja2 (==3.1.4)", "jupyter (>=1.0.0)", "maturin (>=1.7.0,<1.8)", "nbsphinx (>=0.4.2)", "prompt-toolkit (>=2.0.9)", "pyre-check (==0.9.18)", "setuptools-rust (>=1.5.2)", "setuptools-scm (>=6.0.1)", "slotscheck (>=0.7.1)", "sphinx-rtd-theme (>=0.4.3)", "ufmt (==2.7.3)", "usort (==1.0.8.post1)"] +[[package]] +name = "libtmux" +version = "0.46.2" +description = "Typed library that provides an ORM wrapper for tmux, a terminal multiplexer." +optional = false +python-versions = "<4.0,>=3.9" +groups = ["main"] +files = [ + {file = "libtmux-0.46.2-py3-none-any.whl", hash = "sha256:6c32dbf22bde8e5e33b2714a4295f6e838dc640f337cd4c085a044f6828c7793"}, + {file = "libtmux-0.46.2.tar.gz", hash = "sha256:9a398fec5d714129c8344555d466e1a903dfc0f741ba07aabe75a8ceb25c5dda"}, +] + [[package]] name = "linkify-it-py" version = "2.0.3" @@ -6288,4 +6300,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "5bd9cfa879c53b476b1dfdfc7e59b8dff002838a6a83db21b1f0daaba4232819" +content-hash = "631152ff7f5edc5de12cc3ebbd58848a665e2b1a80078b332deb4d33fa6839e6" diff --git a/pyproject.toml b/pyproject.toml index e6afd65..2de982a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ textual = "^4.0.0" xmltodict = "^0.13.0" pyte = "^0.8.1" requests = "^2.32.0" +libtmux = "^0.46.2" [tool.poetry.group.dev.dependencies] # Type checking and static analysis @@ -126,6 +127,7 @@ module = [ "gql.*", "textual.*", "pyte.*", + "libtmux.*", ] ignore_missing_imports = true diff --git a/strix/cli/app.py b/strix/cli/app.py index 95e4be7..286c9ed 100644 --- a/strix/cli/app.py +++ b/strix/cli/app.py @@ -878,7 +878,7 @@ class StrixCLIApp(App): # type: ignore[misc] result = tool_data.get("result") tool_colors = { - "terminal_action": "#22c55e", + "terminal_execute": "#22c55e", "browser_action": "#06b6d4", "python_action": "#3b82f6", "agents_graph_action": "#fbbf24", diff --git a/strix/cli/tool_components/terminal_renderer.py b/strix/cli/tool_components/terminal_renderer.py index f44853c..2902376 100644 --- a/strix/cli/tool_components/terminal_renderer.py +++ b/strix/cli/tool_components/terminal_renderer.py @@ -8,7 +8,7 @@ from .registry import register_tool_renderer @register_tool_renderer class TerminalRenderer(BaseToolRenderer): - tool_name: ClassVar[str] = "terminal_action" + tool_name: ClassVar[str] = "terminal_execute" css_classes: ClassVar[list[str]] = ["tool-call", "terminal-tool"] @classmethod @@ -17,11 +17,12 @@ class TerminalRenderer(BaseToolRenderer): status = tool_data.get("status", "unknown") result = tool_data.get("result", {}) - action = args.get("action", "unknown") - inputs = args.get("inputs", []) + command = args.get("command", "") + is_input = args.get("is_input", False) terminal_id = args.get("terminal_id", "default") + timeout = args.get("timeout") - content = cls._build_sleek_content(action, inputs, terminal_id, result) + content = cls._build_sleek_content(command, is_input, terminal_id, timeout, result) css_classes = cls.get_css_classes(status) return Static(content, classes=css_classes) @@ -29,71 +30,102 @@ class TerminalRenderer(BaseToolRenderer): @classmethod def _build_sleek_content( cls, - action: str, - inputs: list[str], + command: str, + is_input: bool, terminal_id: str, # noqa: ARG003 + timeout: float | None, # noqa: ARG003 result: dict[str, Any], # noqa: ARG003 ) -> str: terminal_icon = ">_" - if action in {"create", "new_terminal"}: - command = cls._format_command(inputs) if inputs else "bash" - return f"{terminal_icon} [#22c55e]${command}[/]" + if not command.strip(): + return f"{terminal_icon} [dim]getting logs...[/]" - if action == "send_input": - command = cls._format_command(inputs) - return f"{terminal_icon} [#22c55e]${command}[/]" + control_sequences = { + "C-c", + "C-d", + "C-z", + "C-a", + "C-e", + "C-k", + "C-l", + "C-u", + "C-w", + "C-r", + "C-s", + "C-t", + "C-y", + "^c", + "^d", + "^z", + "^a", + "^e", + "^k", + "^l", + "^u", + "^w", + "^r", + "^s", + "^t", + "^y", + } + special_keys = { + "Enter", + "Escape", + "Space", + "Tab", + "BTab", + "BSpace", + "DC", + "IC", + "Up", + "Down", + "Left", + "Right", + "Home", + "End", + "PageUp", + "PageDown", + "PgUp", + "PgDn", + "PPage", + "NPage", + "F1", + "F2", + "F3", + "F4", + "F5", + "F6", + "F7", + "F8", + "F9", + "F10", + "F11", + "F12", + } - if action == "wait": - return f"{terminal_icon} [dim]waiting...[/]" + is_special = ( + command in control_sequences + or command in special_keys + or command.startswith(("M-", "S-", "C-S-", "C-M-", "S-M-")) + ) - if action == "close": - return f"{terminal_icon} [dim]close[/]" + if is_special: + return f"{terminal_icon} [#ef4444]{command}[/]" - if action == "get_snapshot": - return f"{terminal_icon} [dim]snapshot[/]" + if is_input: + formatted_command = cls._format_command_display(command) + return f"{terminal_icon} [#3b82f6]>>>[/] [#22c55e]{formatted_command}[/]" - return f"{terminal_icon} [dim]{action}[/]" + formatted_command = cls._format_command_display(command) + return f"{terminal_icon} [#22c55e]$ {formatted_command}[/]" @classmethod - def _format_command(cls, inputs: list[str]) -> str: - if not inputs: + def _format_command_display(cls, command: str) -> str: + if not command: return "" - command_parts = [] - - for input_item in inputs: - if input_item == "Enter": - break - if input_item.startswith("literal:"): - command_parts.append(input_item[8:]) - elif input_item in [ - "Space", - "Tab", - "Backspace", - "Up", - "Down", - "Left", - "Right", - "Home", - "End", - "PageUp", - "PageDown", - "Insert", - "Delete", - "Escape", - ] or input_item.startswith(("^", "C-", "S-", "A-", "F")): - if input_item == "Space": - command_parts.append(" ") - elif input_item == "Tab": - command_parts.append("\t") - continue - else: - command_parts.append(input_item) - - command = "".join(command_parts).strip() - if len(command) > 200: command = command[:197] + "..." - return cls.escape_markup(command) if command else "bash" + return cls.escape_markup(command) diff --git a/strix/tools/argument_parser.py b/strix/tools/argument_parser.py index 06b79f7..0a85f00 100644 --- a/strix/tools/argument_parser.py +++ b/strix/tools/argument_parser.py @@ -1,6 +1,7 @@ import contextlib import inspect import json +import types from collections.abc import Callable from typing import Any, Union, get_args, get_origin @@ -48,7 +49,7 @@ def convert_arguments(func: Callable[..., Any], kwargs: dict[str, Any]) -> dict[ def convert_string_to_type(value: str, param_type: Any) -> Any: origin = get_origin(param_type) - if origin is Union or origin is type(str | None): + if origin is Union or isinstance(param_type, types.UnionType): args = get_args(param_type) for arg_type in args: if arg_type is not type(None): diff --git a/strix/tools/terminal/__init__.py b/strix/tools/terminal/__init__.py index 5b03c72..d53c69a 100644 --- a/strix/tools/terminal/__init__.py +++ b/strix/tools/terminal/__init__.py @@ -1,4 +1,4 @@ -from .terminal_actions import terminal_action +from .terminal_actions import terminal_execute -__all__ = ["terminal_action"] +__all__ = ["terminal_execute"] diff --git a/strix/tools/terminal/terminal_actions.py b/strix/tools/terminal/terminal_actions.py index 05bffd5..1ececa4 100644 --- a/strix/tools/terminal/terminal_actions.py +++ b/strix/tools/terminal/terminal_actions.py @@ -1,53 +1,35 @@ -from typing import Any, Literal +from typing import Any from strix.tools.registry import register_tool from .terminal_manager import get_terminal_manager -TerminalAction = Literal["new_terminal", "send_input", "wait", "close"] - - @register_tool -def terminal_action( - action: TerminalAction, - inputs: list[str] | None = None, - time: float | None = None, +def terminal_execute( + command: str, + is_input: bool = False, + timeout: float | None = None, terminal_id: str | None = None, + no_enter: bool = False, ) -> dict[str, Any]: - def _validate_inputs(action_name: str, inputs: list[str] | None) -> None: - if not inputs: - raise ValueError(f"inputs parameter is required for {action_name} action") - - def _validate_time(time_param: float | None) -> None: - if time_param is None: - raise ValueError("time parameter is required for wait action") - - def _validate_action(action_name: str) -> None: - raise ValueError(f"Unknown action: {action_name}") - manager = get_terminal_manager() try: - match action: - case "new_terminal": - return manager.create_terminal(terminal_id, inputs) - - case "send_input": - _validate_inputs(action, inputs) - assert inputs is not None - return manager.send_input(terminal_id, inputs) - - case "wait": - _validate_time(time) - assert time is not None - return manager.wait_terminal(terminal_id, time) - - case "close": - return manager.close_terminal(terminal_id) - - case _: - _validate_action(action) # type: ignore[unreachable] - + return manager.execute_command( + command=command, + is_input=is_input, + timeout=timeout, + terminal_id=terminal_id, + no_enter=no_enter, + ) except (ValueError, RuntimeError) as e: - return {"error": str(e), "terminal_id": terminal_id, "snapshot": "", "is_running": False} + return { + "error": str(e), + "command": command, + "terminal_id": terminal_id or "default", + "content": "", + "status": "error", + "exit_code": None, + "working_dir": None, + } diff --git a/strix/tools/terminal/terminal_actions_schema.xml b/strix/tools/terminal/terminal_actions_schema.xml index dd5aac4..eda6edc 100644 --- a/strix/tools/terminal/terminal_actions_schema.xml +++ b/strix/tools/terminal/terminal_actions_schema.xml @@ -1,117 +1,130 @@ - - Perform terminal actions using a terminal emulator instance. Each terminal instance - is PERSISTENT and remains active until explicitly closed, allowing for multi-step - workflows and long-running processes. + + Execute a bash command in a persistent terminal session. The terminal maintains state (environment variables, current directory, running processes) between commands. - - The terminal action to perform: - new_terminal: Create a new terminal instance. This MUST be the first action for each terminal tab. - send_input: Send keyboard input to the specified terminal. - wait: Pause execution for specified number of seconds. Can be also used to get the current terminal state (screenshot, output, etc.) after using other tools. - close: Close the specified terminal instance. This MUST be the final action for each terminal tab. + + The bash command to execute. Cannot be empty - must provide a valid command or special key sequence. + + Supported special keys and sequences (based on official tmux key names): + - Control sequences: C-c, C-d, C-z, C-a, C-e, C-k, C-l, C-u, C-w, etc. (also ^c, ^d, etc.) + - Navigation keys: Up, Down, Left, Right, Home, End + - Page keys: PageUp, PageDown, PgUp, PgDn, PPage, NPage + - Function keys: F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11, F12 + - Special keys: Enter, Escape, Space, Tab, BTab, BSpace, DC, IC + - Note: Use official tmux names (BSpace not Backspace, DC not Delete, IC not Insert, Escape not Esc) + - Meta/Alt sequences: M-key (e.g., M-f, M-b) - tmux official modifier + - Shift sequences: S-key (e.g., S-F6, S-Tab, S-Left) + - Combined modifiers: C-S-key, C-M-key, S-M-key, etc. + + These are useful for interacting with vim, emacs, REPLs, and other interactive applications. - - Required for 'new_terminal' and 'send_input' actions: - List of inputs to send to terminal. Each element in the list MUST be one of the following: - Regular text: "hello", "world", etc. - Literal text (not interpreted as special keys): prefix with "literal:" e.g., "literal:Home", "literal:Escape", "literal:Enter" to send these as text - Enter - Space - Backspace - Escape: "Escape", "^[", "C-[" - Tab: "Tab" - Arrow keys: "Left", "Right", "Up", "Down" - Navigation: "Home", "End", "PageUp", "PageDown" - Function keys: "F1" through "F12" Modifier keys supported with prefixes: - ^ or C- : Control (e.g., "^c", "C-c") - S- : Shift (e.g., "S-F6") - A- : Alt (e.g., "A-Home") - Combined modifiers for arrows: "S-A-Up", "C-S-Left" - Inputs MUST in all cases be sent as a LIST of strings, even if you are only sending one input. - Sending Inputs as a single string will NOT work. + + If true, the command is sent as input to a currently running process. If false (default), the command is executed as a new bash command. Use this to interact with running processes. - - Required for 'wait' action. Number of seconds to pause execution. Can be fractional (e.g., 0.5 for half a second). + + Optional timeout in seconds for command execution. If not provided, uses default timeout behavior. Set to higher values for long-running commands like installations or tests. - Identifier for the terminal instance. Required for all actions except the first 'new_terminal' action. Allows managing multiple concurrent terminal tabs. - For 'new_terminal': if not provided, a default terminal is created. If provided, creates a new terminal with that ID. - For other actions: specifies which terminal instance to operate on. - Default terminal ID is "default" if not specified. + Identifier for the terminal session. Defaults to "default". Use different IDs to manage multiple concurrent terminal sessions. + + + If true, don't automatically add Enter/newline after the command. Useful for: + - Interactive prompts where you want to send keys without submitting + - Navigation keys in full-screen applications + + Examples: + - terminal_execute("gg", is_input=true, no_enter=true) # Vim: go to top + - terminal_execute("5j", is_input=true, no_enter=true) # Vim: move down 5 lines + - terminal_execute("i", is_input=true, no_enter=true) # Vim: insert mode - Response containing: - snapshot: raw representation of current terminal state where you can see the output of the command - terminal_id: the ID of the terminal instance that was operated on + Response containing: + - content: Command output + - exit_code: Exit code of the command (only for completed commands) + - command: The executed command + - terminal_id: The terminal session ID + - status: Command status ('completed', 'timeout', 'running') + - working_dir: Current working directory after command execution Important usage rules: - 1. PERSISTENCE: Terminal instances remain active and maintain their state (environment - variables, current directory, running processes) until explicitly closed with the - 'close' action. This allows for multi-step workflows across multiple tool calls. - 2. MULTIPLE TERMINALS: You can run multiple terminal instances concurrently by using - different terminal_id values. Each terminal operates independently. - 3. Terminal interaction MUST begin with 'new_terminal' action for each terminal instance. - 4. Only one action can be performed per call. - 5. Input handling: - - Regular text is sent as-is - - Literal text: prefix with "literal:" to send special key names as literal text - - Special keys must match supported key names - - Modifier combinations follow specific syntax - - Control can be specified as ^ or C- prefix - - Shift (S-) works with special keys only - - Alt (A-) works with any character/key - 6. Wait action: - - Time is specified in seconds - - Can be used to wait for command completion - - Can be fractional (e.g., 0.5 seconds) - - Snapshot and output are captured after the wait - - You should estimate the time it will take to run the command and set the wait time accordingly. - - It can be from a few seconds to a few minutes, choose wisely depending on the command you are running and the task. - 7. The terminal can operate concurrently with other tools. You may invoke - browser, proxy, or other tools (in separate assistant messages) while maintaining - active terminal sessions. - 8. You do not need to close terminals after you are done, but you can if you want to - free up resources. - 9. You MUST end the inputs list with an "Enter" if you want to run the command, as - it is not sent automatically. - 10. AUTOMATIC SPACING BEHAVIOR: - - Consecutive regular text inputs have spaces automatically added between them - - This is helpful for shell commands: ["ls", "-la"] becomes "ls -la" - - This causes problems for compound commands: [":", "w", "q"] becomes ": w q" - - Use "literal:" prefix to bypass spacing: [":", "literal:wq"] becomes ":wq" - - Special keys (Enter, Space, etc.) and literal strings never trigger spacing - 11. WHEN TO USE LITERAL PREFIX: - - Vim commands: [":", "literal:wq", "Enter"] instead of [":", "w", "q", "Enter"] - - Any sequence where exact character positioning matters - - When you need multiple characters sent as a single unit - 12. Do NOT use terminal actions for file editing or writing. Use the replace_in_file, - write_to_file, or read_file tools instead. - 13. PREFER SIMPLE COMMANDS: Avoid complex multiline commands with nested quotes or - complex syntax. Break down complex operations into simpler, individual commands - for better reliability and readability. Never send multiple commands in a single - input list with multiple "Enter" keys - execute one command at a time instead. + 1. PERSISTENT SESSION: The terminal maintains state between commands. Environment variables, + current directory, and running processes persist across multiple tool calls. + + 2. COMMAND EXECUTION: Execute one command at a time. For multiple commands, chain them with + && or ; operators, or make separate tool calls. + + 3. LONG-RUNNING COMMANDS: + - For commands that run indefinitely, run them in background: 'python app.py > server.log 2>&1 &' + - For commands that take time, set appropriate timeout parameter + - Use is_input=true to interact with running processes + + 4. TIMEOUT HANDLING: + - Commands have a default soft timeout (30 seconds of no output changes) + - Set custom timeout for longer operations + - When timeout occurs, you can send empty command to get more output + - Use control sequences (C-c, C-d, C-z) to interrupt processes + + 5. MULTIPLE TERMINALS: Use different terminal_id values to run multiple concurrent sessions. + + 6. INTERACTIVE PROCESSES: Use is_input=true to send input to running processes like: + - Interactive shells, REPLs, or prompts + - Long-running applications waiting for input + - Background processes that need interaction + - Use no_enter=true for stuff like Vim navigation, password typing, or multi-step commands + + 7. WORKING DIRECTORY: The terminal tracks and returns the current working directory. + Use absolute paths or cd commands to change directories as needed. + + 8. OUTPUT HANDLING: Large outputs are automatically truncated. The tool provides + the most relevant parts of the output for analysis. - # Create new terminal with Node.js (default terminal) - - new_terminal - ["node", "Enter"] + # Execute a simple command + + ls -la - # Create a second (parallel) terminal instance for Python - - new_terminal - python_terminal - ["python3", "Enter"] + # Run a command with custom timeout + + npm install + 120 - # Send command to the default terminal - - send_input - ["require('crypto').randomBytes(1000000).toString('hex')", - "Enter"] + # Start a background service + + python app.py > server.log 2>&1 & - # Wait for previous action on default terminal - - wait - 2.0 + # Interact with a running process + + y + true - # Send multiple inputs with special keys to current terminal - - send_input - ["sqlmap -u 'http://example.com/page.php?id=1' --batch", "Enter", "y", - "Enter", "n", "Enter", "n", "Enter"] + # Interrupt a running process + + C-c - # WRONG: Vim command with automatic spacing (becomes ": w q") - - send_input - [":", "w", "q", "Enter"] + # Send Escape key (use official tmux name) + + Escape + true - # CORRECT: Vim command using literal prefix (becomes ":wq") - - send_input - [":", "literal:wq", "Enter"] + # Use a different terminal session + + python3 + python_session + + + # Send input to Python REPL in specific session + + print("Hello World") + true + python_session diff --git a/strix/tools/terminal/terminal_instance.py b/strix/tools/terminal/terminal_instance.py deleted file mode 100644 index f5a70c3..0000000 --- a/strix/tools/terminal/terminal_instance.py +++ /dev/null @@ -1,231 +0,0 @@ -import contextlib -import os -import pty -import select -import signal -import subprocess -import threading -import time -from typing import Any - -import pyte - - -MAX_TERMINAL_SNAPSHOT_LENGTH = 10_000 - - -class TerminalInstance: - def __init__(self, terminal_id: str, initial_command: str | None = None) -> None: - self.terminal_id = terminal_id - self.process: subprocess.Popen[bytes] | None = None - self.master_fd: int | None = None - self.is_running = False - self._output_lock = threading.Lock() - self._reader_thread: threading.Thread | None = None - - self.screen = pyte.HistoryScreen(80, 24, history=1000) - self.stream = pyte.ByteStream() - self.stream.attach(self.screen) - - self._start_terminal(initial_command) - - def _start_terminal(self, initial_command: str | None = None) -> None: - try: - self.master_fd, slave_fd = pty.openpty() - - shell = "/bin/bash" - - self.process = subprocess.Popen( # noqa: S603 - [shell, "-i"], - stdin=slave_fd, - stdout=slave_fd, - stderr=slave_fd, - cwd="/workspace", - preexec_fn=os.setsid, # noqa: PLW1509 - Required for PTY functionality - ) - - os.close(slave_fd) - - self.is_running = True - - self._reader_thread = threading.Thread(target=self._read_output, daemon=True) - self._reader_thread.start() - - time.sleep(0.5) - - if initial_command: - self._write_to_terminal(initial_command) - - except (OSError, ValueError) as e: - raise RuntimeError(f"Failed to start terminal: {e}") from e - - def _read_output(self) -> None: - while self.is_running and self.master_fd: - try: - ready, _, _ = select.select([self.master_fd], [], [], 0.1) - if ready: - data = os.read(self.master_fd, 4096) - if data: - with self._output_lock, contextlib.suppress(TypeError): - self.stream.feed(data) - else: - break - except (OSError, ValueError): - break - - def _write_to_terminal(self, data: str) -> None: - if self.master_fd and self.is_running: - try: - os.write(self.master_fd, data.encode("utf-8")) - except (OSError, ValueError) as e: - raise RuntimeError("Terminal is no longer available") from e - - def send_input(self, inputs: list[str]) -> None: - if not self.is_running: - raise RuntimeError("Terminal is not running") - - for i, input_item in enumerate(inputs): - if input_item.startswith("literal:"): - literal_text = input_item[8:] - self._write_to_terminal(literal_text) - else: - key_sequence = self._get_key_sequence(input_item) - if key_sequence: - self._write_to_terminal(key_sequence) - else: - self._write_to_terminal(input_item) - - time.sleep(0.05) - - if ( - i < len(inputs) - 1 - and not input_item.startswith("literal:") - and not self._is_special_key(input_item) - and not inputs[i + 1].startswith("literal:") - and not self._is_special_key(inputs[i + 1]) - ): - self._write_to_terminal(" ") - - def get_snapshot(self) -> dict[str, Any]: - with self._output_lock: - history_lines = [ - "".join(char.data for char in line_dict.values()) - for line_dict in self.screen.history.top - ] - - current_lines = self.screen.display - - all_lines = history_lines + current_lines - rendered_output = "\n".join(all_lines) - - if len(rendered_output) > MAX_TERMINAL_SNAPSHOT_LENGTH: - rendered_output = rendered_output[-MAX_TERMINAL_SNAPSHOT_LENGTH:] - truncated = True - else: - truncated = False - - return { - "terminal_id": self.terminal_id, - "snapshot": rendered_output, - "is_running": self.is_running, - "process_id": self.process.pid if self.process else None, - "truncated": truncated, - } - - def wait(self, duration: float) -> dict[str, Any]: - time.sleep(duration) - return self.get_snapshot() - - def close(self) -> None: - self.is_running = False - - if self.process: - with contextlib.suppress(OSError, ProcessLookupError): - os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) - - try: - self.process.wait(timeout=2) - except subprocess.TimeoutExpired: - os.killpg(os.getpgid(self.process.pid), signal.SIGKILL) - self.process.wait() - - if self.master_fd: - with contextlib.suppress(OSError): - os.close(self.master_fd) - self.master_fd = None - - if self._reader_thread and self._reader_thread.is_alive(): - self._reader_thread.join(timeout=1) - - def _is_special_key(self, key: str) -> bool: - special_keys = { - "Enter", - "Space", - "Backspace", - "Tab", - "Escape", - "Up", - "Down", - "Left", - "Right", - "Home", - "End", - "PageUp", - "PageDown", - "Insert", - "Delete", - } | {f"F{i}" for i in range(1, 13)} - - if key in special_keys: - return True - - return bool(key.startswith(("^", "C-", "S-", "A-"))) - - def _get_key_sequence(self, key: str) -> str | None: - key_map = { - "Enter": "\r", - "Space": " ", - "Backspace": "\x08", - "Tab": "\t", - "Escape": "\x1b", - "Up": "\x1b[A", - "Down": "\x1b[B", - "Right": "\x1b[C", - "Left": "\x1b[D", - "Home": "\x1b[H", - "End": "\x1b[F", - "PageUp": "\x1b[5~", - "PageDown": "\x1b[6~", - "Insert": "\x1b[2~", - "Delete": "\x1b[3~", - "F1": "\x1b[11~", - "F2": "\x1b[12~", - "F3": "\x1b[13~", - "F4": "\x1b[14~", - "F5": "\x1b[15~", - "F6": "\x1b[17~", - "F7": "\x1b[18~", - "F8": "\x1b[19~", - "F9": "\x1b[20~", - "F10": "\x1b[21~", - "F11": "\x1b[23~", - "F12": "\x1b[24~", - } - - if key in key_map: - return key_map[key] - - if key.startswith("^") and len(key) == 2: - char = key[1].lower() - return chr(ord(char) - ord("a") + 1) if "a" <= char <= "z" else None - - if key.startswith("C-") and len(key) == 3: - char = key[2].lower() - return chr(ord(char) - ord("a") + 1) if "a" <= char <= "z" else None - - return None - - def is_alive(self) -> bool: - if not self.process: - return False - return self.process.poll() is None diff --git a/strix/tools/terminal/terminal_manager.py b/strix/tools/terminal/terminal_manager.py index 9d5f32b..95014f0 100644 --- a/strix/tools/terminal/terminal_manager.py +++ b/strix/tools/terminal/terminal_manager.py @@ -5,173 +5,133 @@ import sys import threading from typing import Any -from .terminal_instance import TerminalInstance +from .terminal_session import TerminalSession class TerminalManager: def __init__(self) -> None: - self.terminals: dict[str, TerminalInstance] = {} + self.sessions: dict[str, TerminalSession] = {} self._lock = threading.Lock() self.default_terminal_id = "default" + self.default_timeout = 30.0 self._register_cleanup_handlers() - def create_terminal( - self, terminal_id: str | None = None, inputs: list[str] | None = None + def execute_command( + self, + command: str, + is_input: bool = False, + timeout: float | None = None, + terminal_id: str | None = None, + no_enter: bool = False, ) -> dict[str, Any]: if terminal_id is None: terminal_id = self.default_terminal_id - with self._lock: - if terminal_id in self.terminals: - raise ValueError(f"Terminal '{terminal_id}' already exists") - - initial_command = None - if inputs: - command_parts: list[str] = [] - for input_item in inputs: - if input_item == "Enter": - initial_command = " ".join(command_parts) + "\n" - break - if input_item.startswith("literal:"): - command_parts.append(input_item[8:]) - elif input_item not in [ - "Space", - "Tab", - "Backspace", - ]: - command_parts.append(input_item) - - try: - terminal = TerminalInstance(terminal_id, initial_command) - self.terminals[terminal_id] = terminal - - if inputs and not initial_command: - terminal.send_input(inputs) - result = terminal.wait(2.0) - else: - result = terminal.wait(1.0) - - result["message"] = f"Terminal '{terminal_id}' created successfully" - - except (OSError, ValueError, RuntimeError) as e: - raise RuntimeError(f"Failed to create terminal '{terminal_id}': {e}") from e - else: - return result - - def send_input( - self, terminal_id: str | None = None, inputs: list[str] | None = None - ) -> dict[str, Any]: - if terminal_id is None: - terminal_id = self.default_terminal_id - - if not inputs: - raise ValueError("No inputs provided") - - with self._lock: - if terminal_id not in self.terminals: - raise ValueError(f"Terminal '{terminal_id}' not found") - - terminal = self.terminals[terminal_id] + session = self._get_or_create_session(terminal_id) try: - terminal.send_input(inputs) - result = terminal.wait(2.0) - result["message"] = f"Input sent to terminal '{terminal_id}'" - except (OSError, ValueError, RuntimeError) as e: - raise RuntimeError(f"Failed to send input to terminal '{terminal_id}': {e}") from e - else: - return result + result = session.execute(command, is_input, timeout or self.default_timeout, no_enter) - def wait_terminal( - self, terminal_id: str | None = None, duration: float = 1.0 - ) -> dict[str, Any]: + return { + "content": result["content"], + "command": command, + "terminal_id": terminal_id, + "status": result["status"], + "exit_code": result.get("exit_code"), + "working_dir": result.get("working_dir"), + } + + except RuntimeError as e: + return { + "error": str(e), + "command": command, + "terminal_id": terminal_id, + "content": "", + "status": "error", + "exit_code": None, + "working_dir": None, + } + except OSError as e: + return { + "error": f"System error: {e}", + "command": command, + "terminal_id": terminal_id, + "content": "", + "status": "error", + "exit_code": None, + "working_dir": None, + } + + def _get_or_create_session(self, terminal_id: str) -> TerminalSession: + with self._lock: + if terminal_id not in self.sessions: + self.sessions[terminal_id] = TerminalSession(terminal_id) + return self.sessions[terminal_id] + + def close_session(self, terminal_id: str | None = None) -> dict[str, Any]: if terminal_id is None: terminal_id = self.default_terminal_id with self._lock: - if terminal_id not in self.terminals: - raise ValueError(f"Terminal '{terminal_id}' not found") + if terminal_id not in self.sessions: + return { + "terminal_id": terminal_id, + "message": f"Terminal '{terminal_id}' not found", + "status": "not_found", + } - terminal = self.terminals[terminal_id] + session = self.sessions.pop(terminal_id) try: - result = terminal.wait(duration) - result["message"] = f"Waited {duration}s on terminal '{terminal_id}'" - except (OSError, ValueError, RuntimeError) as e: - raise RuntimeError(f"Failed to wait on terminal '{terminal_id}': {e}") from e - else: - return result - - def close_terminal(self, terminal_id: str | None = None) -> dict[str, Any]: - if terminal_id is None: - terminal_id = self.default_terminal_id - - with self._lock: - if terminal_id not in self.terminals: - raise ValueError(f"Terminal '{terminal_id}' not found") - - terminal = self.terminals.pop(terminal_id) - - try: - terminal.close() - except (OSError, ValueError, RuntimeError) as e: - raise RuntimeError(f"Failed to close terminal '{terminal_id}': {e}") from e + session.close() + except (RuntimeError, OSError) as e: + return { + "terminal_id": terminal_id, + "error": f"Failed to close terminal '{terminal_id}': {e}", + "status": "error", + } else: return { "terminal_id": terminal_id, "message": f"Terminal '{terminal_id}' closed successfully", - "snapshot": "", - "is_running": False, + "status": "closed", } - def get_terminal_snapshot(self, terminal_id: str | None = None) -> dict[str, Any]: - if terminal_id is None: - terminal_id = self.default_terminal_id - + def list_sessions(self) -> dict[str, Any]: with self._lock: - if terminal_id not in self.terminals: - raise ValueError(f"Terminal '{terminal_id}' not found") - - terminal = self.terminals[terminal_id] - - return terminal.get_snapshot() - - def list_terminals(self) -> dict[str, Any]: - with self._lock: - terminal_info = {} - for tid, terminal in self.terminals.items(): - terminal_info[tid] = { - "is_running": terminal.is_running, - "is_alive": terminal.is_alive(), - "process_id": terminal.process.pid if terminal.process else None, + session_info: dict[str, dict[str, Any]] = {} + for tid, session in self.sessions.items(): + session_info[tid] = { + "is_running": session.is_running(), + "working_dir": session.get_working_dir(), } - return {"terminals": terminal_info, "total_count": len(terminal_info)} + return {"sessions": session_info, "total_count": len(session_info)} - def cleanup_dead_terminals(self) -> None: + def cleanup_dead_sessions(self) -> None: with self._lock: - dead_terminals = [] - for tid, terminal in self.terminals.items(): - if not terminal.is_alive(): - dead_terminals.append(tid) + dead_sessions: list[str] = [] + for tid, session in self.sessions.items(): + if not session.is_running(): + dead_sessions.append(tid) - for tid in dead_terminals: - terminal = self.terminals.pop(tid) + for tid in dead_sessions: + session = self.sessions.pop(tid) with contextlib.suppress(Exception): - terminal.close() + session.close() - def close_all_terminals(self) -> None: + def close_all_sessions(self) -> None: with self._lock: - terminals_to_close = list(self.terminals.values()) - self.terminals.clear() + sessions_to_close = list(self.sessions.values()) + self.sessions.clear() - for terminal in terminals_to_close: + for session in sessions_to_close: with contextlib.suppress(Exception): - terminal.close() + session.close() def _register_cleanup_handlers(self) -> None: - atexit.register(self.close_all_terminals) + atexit.register(self.close_all_sessions) signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) @@ -180,7 +140,7 @@ class TerminalManager: signal.signal(signal.SIGHUP, self._signal_handler) def _signal_handler(self, _signum: int, _frame: Any) -> None: - self.close_all_terminals() + self.close_all_sessions() sys.exit(0) diff --git a/strix/tools/terminal/terminal_session.py b/strix/tools/terminal/terminal_session.py new file mode 100644 index 0000000..711d340 --- /dev/null +++ b/strix/tools/terminal/terminal_session.py @@ -0,0 +1,356 @@ +import logging +import re +import time +import uuid +from enum import Enum +from pathlib import Path +from typing import Any + +import libtmux + + +logger = logging.getLogger(__name__) + + +class BashCommandStatus(Enum): + CONTINUE = "continue" + COMPLETED = "completed" + NO_CHANGE_TIMEOUT = "no_change_timeout" + HARD_TIMEOUT = "hard_timeout" + + +def _remove_command_prefix(command_output: str, command: str) -> str: + return command_output.lstrip().removeprefix(command.lstrip()).lstrip() + + +class TerminalSession: + POLL_INTERVAL = 0.5 + HISTORY_LIMIT = 10_000 + PS1_END = "]$ " + + def __init__(self, session_id: str, work_dir: str = "/workspace") -> None: + self.session_id = session_id + self.work_dir = str(Path(work_dir).resolve()) + self._closed = False + self._cwd = self.work_dir + self.NO_CHANGE_TIMEOUT_SECONDS = 30 + + self.server: libtmux.Server | None = None + self.session: libtmux.Session | None = None + self.window: libtmux.Window | None = None + self.pane: libtmux.Pane | None = None + + self.prev_status: BashCommandStatus | None = None + self.prev_output: str = "" + self._initialized = False + + self.initialize() + + @property + def PS1(self) -> str: # noqa: N802 + return r"[STRIX_$?]$ " + + @property + def PS1_PATTERN(self) -> str: # noqa: N802 + return r"\[STRIX_(\d+)\]" + + def initialize(self) -> None: + self.server = libtmux.Server() + + session_name = f"strix-{self.session_id}-{uuid.uuid4()}" + self.session = self.server.new_session( + session_name=session_name, + start_directory=self.work_dir, + kill_session=True, + x=120, + y=30, + ) + + self.session.set_option("history-limit", str(self.HISTORY_LIMIT)) + self.session.history_limit = self.HISTORY_LIMIT + + _initial_window = self.session.active_window + self.window = self.session.new_window( + window_name="bash", + window_shell="/bin/bash", + start_directory=self.work_dir, + ) + self.pane = self.window.active_pane + _initial_window.kill() + + self.pane.send_keys(f'export PROMPT_COMMAND=\'export PS1="{self.PS1}"\'; export PS2=""') + time.sleep(0.1) + self._clear_screen() + + self.prev_status = None + self.prev_output = "" + self._closed = False + + self._cwd = str(Path(self.work_dir).resolve()) + self._initialized = True + + assert self.server is not None + assert self.session is not None + assert self.window is not None + assert self.pane is not None + + def _get_pane_content(self) -> str: + if not self.pane: + raise RuntimeError("Terminal session not properly initialized") + return "\n".join( + line.rstrip() for line in self.pane.cmd("capture-pane", "-J", "-pS", "-").stdout + ) + + def _clear_screen(self) -> None: + if not self.pane: + raise RuntimeError("Terminal session not properly initialized") + self.pane.send_keys("C-l", enter=False) + time.sleep(0.1) + self.pane.cmd("clear-history") + + def _is_control_key(self, command: str) -> bool: + return ( + (command.startswith("C-") and len(command) >= 3) + or (command.startswith("^") and len(command) >= 2) + or (command.startswith("S-") and len(command) >= 3) + or (command.startswith("M-") and len(command) >= 3) + ) + + def _is_function_key(self, command: str) -> bool: + if not command.startswith("F") or len(command) > 3: + return False + try: + num_part = command[1:] + return num_part.isdigit() and 1 <= int(num_part) <= 12 + except (ValueError, IndexError): + return False + + def _is_navigation_or_special_key(self, command: str) -> bool: + navigation_keys = {"Up", "Down", "Left", "Right", "Home", "End"} + special_keys = {"BSpace", "BTab", "DC", "Enter", "Escape", "IC", "Space", "Tab"} + page_keys = {"NPage", "PageDown", "PgDn", "PPage", "PageUp", "PgUp"} + + return command in navigation_keys or command in special_keys or command in page_keys + + def _is_complex_modifier_key(self, command: str) -> bool: + return "-" in command and any( + command.startswith(prefix) + for prefix in ["C-S-", "C-M-", "S-M-", "M-S-", "M-C-", "S-C-"] + ) + + def _is_special_key(self, command: str) -> bool: + _command = command.strip() + + if not _command: + return False + + return ( + self._is_control_key(_command) + or self._is_function_key(_command) + or self._is_navigation_or_special_key(_command) + or self._is_complex_modifier_key(_command) + ) + + def _matches_ps1_metadata(self, content: str) -> list[re.Match[str]]: + return list(re.finditer(self.PS1_PATTERN + r"\]\$ ", content)) + + def _get_command_output( + self, + command: str, + raw_command_output: str, + continue_prefix: str = "", + ) -> str: + if self.prev_output: + command_output = raw_command_output.removeprefix(self.prev_output) + if continue_prefix: + command_output = continue_prefix + command_output + else: + command_output = raw_command_output + self.prev_output = raw_command_output + command_output = _remove_command_prefix(command_output, command) + return command_output.rstrip() + + def _combine_outputs_between_matches( + self, + pane_content: str, + ps1_matches: list[re.Match[str]], + get_content_before_last_match: bool = False, + ) -> str: + if len(ps1_matches) == 1: + if get_content_before_last_match: + return pane_content[: ps1_matches[0].start()] + return pane_content[ps1_matches[0].end() + 1 :] + if len(ps1_matches) == 0: + return pane_content + + combined_output = "" + for i in range(len(ps1_matches) - 1): + output_segment = pane_content[ps1_matches[i].end() + 1 : ps1_matches[i + 1].start()] + combined_output += output_segment + "\n" + combined_output += pane_content[ps1_matches[-1].end() + 1 :] + return combined_output + + def _extract_exit_code_from_matches(self, ps1_matches: list[re.Match[str]]) -> int | None: + if not ps1_matches: + return None + + last_match = ps1_matches[-1] + try: + return int(last_match.group(1)) + except (ValueError, IndexError): + return None + + def execute( + self, command: str, is_input: bool = False, timeout: float = 30.0, no_enter: bool = False + ) -> dict[str, Any]: + if not self._initialized: + raise RuntimeError("Bash session is not initialized") + + if command == "" or command.strip() == "": + return { + "content": ( + "Command cannot be empty - must provide a valid command or control sequence" + ), + "status": "error", + "exit_code": None, + "working_dir": self._cwd, + } + + if ( + self.prev_status + in { + BashCommandStatus.HARD_TIMEOUT, + BashCommandStatus.NO_CHANGE_TIMEOUT, + } + and not is_input + and command != "" + ): + return { + "content": ( + f'Previous command still running. Cannot execute "{command}". ' + "Use is_input=True to interact with running process." + ), + "status": "error", + "exit_code": None, + "working_dir": self._cwd, + } + + initial_pane_output = self._get_pane_content() + initial_ps1_matches = self._matches_ps1_metadata(initial_pane_output) + initial_ps1_count = len(initial_ps1_matches) + + start_time = time.time() + last_change_time = start_time + last_pane_output = initial_pane_output + + if command != "": + if not self.pane: + raise RuntimeError("Terminal session not properly initialized") + is_special_key = self._is_special_key(command) + should_add_enter = not is_special_key and not no_enter + self.pane.send_keys(command, enter=should_add_enter) + + while True: + cur_pane_output = self._get_pane_content() + ps1_matches = self._matches_ps1_metadata(cur_pane_output) + current_ps1_count = len(ps1_matches) + + if cur_pane_output != last_pane_output: + last_pane_output = cur_pane_output + last_change_time = time.time() + + if current_ps1_count > initial_ps1_count or cur_pane_output.rstrip().endswith( + self.PS1_END.rstrip() + ): + exit_code = self._extract_exit_code_from_matches(ps1_matches) + + get_content_before_last_match = bool(len(ps1_matches) == 1) + raw_command_output = self._combine_outputs_between_matches( + cur_pane_output, + ps1_matches, + get_content_before_last_match=get_content_before_last_match, + ) + + command_output = self._get_command_output(command, raw_command_output) + self.prev_status = BashCommandStatus.COMPLETED + self.prev_output = "" + self._ready_for_next_command() + + return { + "content": command_output, + "status": "completed", + "exit_code": exit_code or 0, + "working_dir": self._cwd, + } + + time_since_last_change = time.time() - last_change_time + if time_since_last_change >= self.NO_CHANGE_TIMEOUT_SECONDS: + raw_command_output = self._combine_outputs_between_matches( + cur_pane_output, ps1_matches + ) + command_output = self._get_command_output( + command, + raw_command_output, + continue_prefix="[Below is the output of the previous command.]\n", + ) + self.prev_status = BashCommandStatus.NO_CHANGE_TIMEOUT + + return { + "content": command_output + f"\n[Command timed out - no output change for " + f"{self.NO_CHANGE_TIMEOUT_SECONDS} seconds]", + "status": "timeout", + "exit_code": -1, + "working_dir": self._cwd, + } + + elapsed_time = time.time() - start_time + if elapsed_time >= timeout: + raw_command_output = self._combine_outputs_between_matches( + cur_pane_output, ps1_matches + ) + command_output = self._get_command_output( + command, + raw_command_output, + continue_prefix="[Below is the output of the previous command.]\n", + ) + self.prev_status = BashCommandStatus.HARD_TIMEOUT + + return { + "content": command_output + f"\n[Command timed out after {timeout} seconds]", + "status": "timeout", + "exit_code": -1, + "working_dir": self._cwd, + } + + time.sleep(self.POLL_INTERVAL) + + def _ready_for_next_command(self) -> None: + self._clear_screen() + + def is_running(self) -> bool: + if self._closed or not self.session: + return False + try: + return self.session.id in [s.id for s in self.server.sessions] if self.server else False + except (AttributeError, OSError) as e: + logger.debug("Error checking if session is running: %s", e) + return False + + def get_working_dir(self) -> str: + return self._cwd + + def close(self) -> None: + if self._closed: + return + + if self.session: + try: + self.session.kill() + except (AttributeError, OSError) as e: + logger.debug("Error closing terminal session: %s", e) + + self._closed = True + self.server = None + self.session = None + self.window = None + self.pane = None