diff --git a/CHANGES b/CHANGES index 4be978a..64f6b36 100644 --- a/CHANGES +++ b/CHANGES @@ -6,15 +6,50 @@ _Notes on upcoming releases will be added here_ +### What's new + +**Typed tmux operation chains with {tooliconl}`run-tmux-plan`** + +{tooliconl}`run-tmux-plan` accepts an ordered list of typed tmux +operations and runs each one over a persistent `tmux -C` control connection. It +returns one typed result per operation, discriminated by `kind`: `capture_pane` +returns its `lines`, `split_pane` returns the new `pane_id`, and the rest return +status only. With `on_error="stop"` (the default) it stops before the next +operation once one fails or its target cannot be resolved, marking the rest +skipped; with `on_error="continue"` it records each failure and runs the rest. +Each pane operation takes one typed `target`, discriminated by `kind`: a +concrete `pane_id` or a `ref` minted by an earlier split. Typed `split_evenly` +and `make_grid` operations build an even row or column or a tiled grid of panes +without hand-written layout strings. A typed `kill_pane` operation closes a +pane, and runs only when the server's safety tier is `destructive` so a +mutating-tier plan cannot smuggle a destructive command past the safety gate. It +returns concrete pane IDs captured from referenced splits so later operations +can target them, supports a dry-run mode that returns the planned steps without +touching tmux, applies a per-dispatch timeout, and can roll back panes created +by typed split refs when a later operation fails. Pass `explain=true` to attach +per-dispatch diagnostics under `diagnostics`. It keeps +{tooliconl}`call-tools-batch` available for workflows that need to call +arbitrary MCP tools instead of this tool's typed operation set. + +**One generic batch tool with {tooliconl}`call-tools-batch`** + +{tooliconl}`call-tools-batch` runs an ordered list of existing MCP tools in a +single call and returns a per-operation result for each, preserving every +nested tool's structured output. Each nested call still runs through the +server's safety tier, and an optional `max_tier` caps the batch below that tier +(`readonly` refuses mutating or destructive nested calls, `mutating` refuses +destructive ones). It replaces the separate readonly, mutating, and destructive +batch tools with one tool plus the `max_tier` argument. + ## libtmux-mcp 0.1.0a14 (2026-06-14) -libtmux-mcp 0.1.0a14 adds tier-aware tool batching. {tooliconl}`call-readonly-tools-batch`, {tooliconl}`call-mutating-tools-batch`, and {tooliconl}`call-destructive-tools-batch` run an ordered list of existing MCP tools in a single call and return a per-operation result for each, preserving every nested tool's own structured output. Each wrapper caps the safety tier of the calls it will make — regardless of the server's `LIBTMUX_SAFETY` tier — and `on_error` selects stop-at-first-failure or continue-and-report handling. Aggregate results stay within the server's response limit. +libtmux-mcp 0.1.0a14 adds tier-aware tool batching. `call_readonly_tools_batch`, `call_mutating_tools_batch`, and `call_destructive_tools_batch` run an ordered list of existing MCP tools in a single call and return a per-operation result for each, preserving every nested tool's own structured output. Each wrapper caps the safety tier of the calls it will make — regardless of the server's `LIBTMUX_SAFETY` tier — and `on_error` selects stop-at-first-failure or continue-and-report handling. Aggregate results stay within the server's response limit. ### What's new **Tier-aware tool batching** -{tooliconl}`call-readonly-tools-batch`, {tooliconl}`call-mutating-tools-batch`, and {tooliconl}`call-destructive-tools-batch` run an ordered list of existing MCP tools in a single call and return a per-operation result for each, preserving every nested tool's own structured output. Each wrapper caps the safety tier of the calls it will make — the readonly wrapper refuses mutating or destructive operations, and the mutating wrapper refuses destructive ones — regardless of the server's `LIBTMUX_SAFETY` tier. Nested calls keep their normal schema validation, middleware, and safety checks, and `on_error` selects stop-at-first-failure or continue-and-report handling. Large aggregate results stay within the server's response limit — oversized nested payloads are dropped (with the truncation flagged in the result), and very large operation lists are rejected rather than allowed to overflow it. (#79) +`call_readonly_tools_batch`, `call_mutating_tools_batch`, and `call_destructive_tools_batch` run an ordered list of existing MCP tools in a single call and return a per-operation result for each, preserving every nested tool's own structured output. Each wrapper caps the safety tier of the calls it will make — the readonly wrapper refuses mutating or destructive operations, and the mutating wrapper refuses destructive ones — regardless of the server's `LIBTMUX_SAFETY` tier. Nested calls keep their normal schema validation, middleware, and safety checks, and `on_error` selects stop-at-first-failure or continue-and-report handling. Large aggregate results stay within the server's response limit — oversized nested payloads are dropped (with the truncation flagged in the result), and very large operation lists are rejected rather than allowed to overflow it. (#79) ## libtmux-mcp 0.1.0a13 (2026-06-13) diff --git a/docs/conf.py b/docs/conf.py index af8bbd1..e19fa76 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -119,6 +119,7 @@ def _patched_tool_collector_tool(self: ToolCollector, **kwargs: t.Any) -> t.Any: conf["fastmcp_tool_modules"] = [ "libtmux_mcp.tools.batch_tools", + "libtmux_mcp.tools.chain_tools", "libtmux_mcp.tools.server_tools", "libtmux_mcp.tools.session_tools", "libtmux_mcp.tools.window_tools", @@ -131,6 +132,7 @@ def _patched_tool_collector_tool(self: ToolCollector, **kwargs: t.Any) -> t.Any: ] conf["fastmcp_area_map"] = { "batch_tools": "batch/index", + "chain_tools": "chain/index", "server_tools": "server/index", "session_tools": "session/index", "window_tools": "window/index", @@ -161,6 +163,23 @@ def _patched_tool_collector_tool(self: ToolCollector, **kwargs: t.Any) -> t.Any: "SendKeysOperation", "SendKeysOperationResult", "SendKeysBatchResult", + "PaneIdTarget", + "RefTarget", + "SplitPaneOperation", + "TmuxSendKeysOperation", + "ResizePaneOperation", + "SelectLayoutOperation", + "SetOptionOperation", + "CapturePaneOperation", + "SplitEvenlyOperation", + "MakeGridOperation", + "KillPaneOperation", + "SplitPaneStepResult", + "CapturePaneStepResult", + "OperationStepResult", + "TmuxOperationDispatchResult", + "RunTmuxDiagnostics", + "RunTmuxPlanResult", "ToolCallOperation", "ToolCallOperationResult", "ToolCallBatchResult", diff --git a/docs/index.md b/docs/index.md index e735b4f..85845ab 100644 --- a/docs/index.md +++ b/docs/index.md @@ -71,19 +71,19 @@ Config blocks for Claude Desktop, Claude Code, Cursor, and others. Read tmux state without changing anything. -{toolref}`list-sessions` · {toolref}`capture-pane` · {toolref}`capture-since` · {toolref}`snapshot-pane` · {toolref}`get-pane-info` · {toolref}`find-pane-by-position` · {toolref}`search-panes` · {toolref}`wait-for-text` · {toolref}`wait-for-content-change` · {toolref}`display-message` · {toolref}`call-readonly-tools-batch` +{toolref}`list-sessions` · {toolref}`capture-pane` · {toolref}`capture-since` · {toolref}`snapshot-pane` · {toolref}`get-pane-info` · {toolref}`find-pane-by-position` · {toolref}`search-panes` · {toolref}`wait-for-text` · {toolref}`wait-for-content-change` · {toolref}`display-message` · {toolref}`call-tools-batch` ### Act (mutating) Create or modify tmux objects. -{toolref}`create-session` · {toolref}`send-keys` · {toolref}`send-keys-batch` · {toolref}`run-command` · {toolref}`paste-text` · {toolref}`create-window` · {toolref}`split-window` · {toolref}`select-pane` · {toolref}`select-window` · {toolref}`move-window` · {toolref}`resize-pane` · {toolref}`pipe-pane` · {toolref}`set-option` · {toolref}`call-mutating-tools-batch` +{toolref}`create-session` · {toolref}`send-keys` · {toolref}`send-keys-batch` · {toolref}`run-command` · {toolref}`paste-text` · {toolref}`create-window` · {toolref}`split-window` · {toolref}`select-pane` · {toolref}`select-window` · {toolref}`move-window` · {toolref}`resize-pane` · {toolref}`pipe-pane` · {toolref}`set-option` ### Destroy (destructive) Tear down tmux objects. Not reversible. -{toolref}`kill-session` · {toolref}`kill-window` · {toolref}`kill-pane` · {toolref}`kill-server` · {toolref}`call-destructive-tools-batch` +{toolref}`kill-session` · {toolref}`kill-window` · {toolref}`kill-pane` · {toolref}`kill-server` [Browse all tools →](tools/index) diff --git a/docs/reference/api/tools.md b/docs/reference/api/tools.md index fddc84e..751f038 100644 --- a/docs/reference/api/tools.md +++ b/docs/reference/api/tools.md @@ -9,6 +9,15 @@ :show-inheritance: ``` +## Chain tools + +```{eval-rst} +.. automodule:: libtmux_mcp.tools.chain_tools + :members: + :undoc-members: + :show-inheritance: +``` + ## Server tools ```{eval-rst} diff --git a/docs/tools/batch/call-destructive-tools-batch.md b/docs/tools/batch/call-destructive-tools-batch.md deleted file mode 100644 index aad382c..0000000 --- a/docs/tools/batch/call-destructive-tools-batch.md +++ /dev/null @@ -1,32 +0,0 @@ -# Call destructive tools batch - -```{fastmcp-tool} batch_tools.call_destructive_tools_batch -``` - -**Use when** a reviewed workflow intentionally includes destructive -tools and should still return one per-operation result envelope. - -**Avoid when** the workflow can fit inside -{tooliconl}`call-mutating-tools-batch`. This wrapper can invoke -destructive nested tools when the server safety tier permits them. - -**Side effects:** Runs readonly, mutating, and destructive nested tools -in order. Recursive batch calls are rejected. - -**Example:** - -```json -{ - "tool": "call_destructive_tools_batch", - "arguments": { - "operations": [ - {"tool": "kill_pane", "arguments": {"pane_id": "%7"}}, - {"tool": "list_panes", "arguments": {"window_id": "@3"}} - ], - "on_error": "stop" - } -} -``` - -```{fastmcp-tool-input} batch_tools.call_destructive_tools_batch -``` diff --git a/docs/tools/batch/call-mutating-tools-batch.md b/docs/tools/batch/call-mutating-tools-batch.md deleted file mode 100644 index ceb34bc..0000000 --- a/docs/tools/batch/call-mutating-tools-batch.md +++ /dev/null @@ -1,41 +0,0 @@ -# Call mutating tools batch - -```{fastmcp-tool} batch_tools.call_mutating_tools_batch -``` - -**Use when** you need an ordered workflow made from existing typed MCP -tools, such as renaming and splitting a known window, while preserving -each tool's own schema and safety checks. - -**Avoid when** you need tmux's native semicolon command parsing. This -tool batches MCP tools; it does not create one tmux command sequence. -For shell commands with completion and output, prefer -{tooliconl}`run-command`. - -**Side effects:** Runs readonly and mutating nested tools in order. -Destructive nested tools are rejected even when the server process is -running with `LIBTMUX_SAFETY=destructive`. - -**Example:** - -```json -{ - "tool": "call_mutating_tools_batch", - "arguments": { - "operations": [ - { - "tool": "rename_window", - "arguments": {"window_id": "@2", "new_name": "logs"} - }, - { - "tool": "split_window", - "arguments": {"window_id": "@2", "direction": "right"} - } - ], - "on_error": "stop" - } -} -``` - -```{fastmcp-tool-input} batch_tools.call_mutating_tools_batch -``` diff --git a/docs/tools/batch/call-readonly-tools-batch.md b/docs/tools/batch/call-readonly-tools-batch.md deleted file mode 100644 index 8d32190..0000000 --- a/docs/tools/batch/call-readonly-tools-batch.md +++ /dev/null @@ -1,34 +0,0 @@ -# Call readonly tools batch - -```{fastmcp-tool} batch_tools.call_readonly_tools_batch -``` - -**Use when** you need several read-only observations in one ordered -MCP turn, such as listing sessions and then reading server metadata. - -**Avoid when** any nested operation changes tmux state — use -{tooliconl}`call-mutating-tools-batch` for readonly + mutating -workflows, or call the individual tools when each result should be -reviewed before choosing the next action. - -**Side effects:** None beyond the nested readonly tools. Mutating and -destructive nested tools are rejected even when the server process is -running with a higher safety tier. - -**Example:** - -```json -{ - "tool": "call_readonly_tools_batch", - "arguments": { - "operations": [ - {"tool": "list_sessions", "arguments": {}}, - {"tool": "get_server_info", "arguments": {}} - ], - "on_error": "stop" - } -} -``` - -```{fastmcp-tool-input} batch_tools.call_readonly_tools_batch -``` diff --git a/docs/tools/batch/call-tools-batch.md b/docs/tools/batch/call-tools-batch.md new file mode 100644 index 0000000..fc33c4f --- /dev/null +++ b/docs/tools/batch/call-tools-batch.md @@ -0,0 +1,39 @@ +# Call tools batch + +```{fastmcp-tool} batch_tools.call_tools_batch +``` + +**Use when** you need an ordered workflow made from existing typed MCP +tools, such as renaming and splitting a known window, while preserving +each tool's own schema and safety checks. + +**Avoid when** the steps are tmux pane or window operations; prefer the +typed {tooliconl}`run-tmux-plan` tool. For shell commands with completion +and output, prefer {tooliconl}`run-command`. + +**Safety:** Each nested call still runs through the server's safety tier, +so the batch can never run a nested tool the tier hides. Set `max_tier` to +cap the batch below the server tier: `readonly` refuses any mutating or +destructive nested call, and `mutating` refuses destructive ones. The +default permits every tier the server already allows. + +**Example:** + +```json +{ + "tool": "call_tools_batch", + "arguments": { + "operations": [ + {"tool": "rename_window", + "arguments": {"window_id": "@2", "new_name": "logs"}}, + {"tool": "split_window", + "arguments": {"window_id": "@2", "direction": "right"}} + ], + "max_tier": "mutating", + "on_error": "stop" + } +} +``` + +```{fastmcp-tool-input} batch_tools.call_tools_batch +``` diff --git a/docs/tools/batch/index.md b/docs/tools/batch/index.md index be5684c..36e33bb 100644 --- a/docs/tools/batch/index.md +++ b/docs/tools/batch/index.md @@ -7,16 +7,8 @@ including `socket_name` when needed. ::::{grid} 1 1 2 3 :gutter: 2 2 3 3 -:::{grid-item-card} {tooliconl}`call-readonly-tools-batch` -Call readonly tools in order. -::: - -:::{grid-item-card} {tooliconl}`call-mutating-tools-batch` -Call readonly or mutating tools in order. -::: - -:::{grid-item-card} {tooliconl}`call-destructive-tools-batch` -Call readonly, mutating, or destructive tools in order. +:::{grid-item-card} {tooliconl}`call-tools-batch` +Call existing MCP tools in order, with an optional safety-tier cap. ::: :::: @@ -25,7 +17,5 @@ Call readonly, mutating, or destructive tools in order. :hidden: :maxdepth: 1 -call-readonly-tools-batch -call-mutating-tools-batch -call-destructive-tools-batch +call-tools-batch ``` diff --git a/docs/tools/chain/index.md b/docs/tools/chain/index.md new file mode 100644 index 0000000..372ef9d --- /dev/null +++ b/docs/tools/chain/index.md @@ -0,0 +1,23 @@ +# Chain tools + +Chain tools run a typed list of tmux operations over a persistent tmux +control connection, one dispatch per operation, and return one typed +result per step. They are different from batch tools: batch tools call +existing MCP tools one by one, while chain tools take a typed tmux +operation list directly. + +::::{grid} 1 1 2 3 +:gutter: 2 2 3 3 + +:::{grid-item-card} {tooliconl}`run-tmux-plan` +Run a typed plan of tmux operations, one result per step. +::: + +:::: + +```{toctree} +:hidden: +:maxdepth: 1 + +run-tmux-plan +``` diff --git a/docs/tools/chain/run-tmux-plan.md b/docs/tools/chain/run-tmux-plan.md new file mode 100644 index 0000000..89e4ed6 --- /dev/null +++ b/docs/tools/chain/run-tmux-plan.md @@ -0,0 +1,78 @@ +# Run tmux plan + +```{fastmcp-tool} chain_tools.run_tmux_plan +``` + +**Use when** you need several typed tmux operations to run in order over +one persistent tmux control connection, with a typed result per step. + +**Avoid when** you need to call arbitrary MCP tools; use +{tooliconl}`call-tools-batch` for that. Use individual tools +when a workflow has only one step. + +**Execution:** Each operation is dispatched on its own over a persistent +`tmux -C` control connection, so every operation keeps its own result. A +`split_pane` with a `ref` returns the new pane ID in `created_panes`, and +later operations can target it with a `ref` target. + +**Targets:** Each pane operation takes one typed `target`, discriminated by +`kind`: `pane_id` (a concrete `%id`) or `ref` (a name minted by an earlier +`split_pane`). + +**Layouts:** `split_evenly` splits a pane into an even row or column of +`count` panes, and `make_grid` tiles a pane's window into a `rows` by `cols` +grid. Both compile to native splits plus a `select-layout`; use the raw +`select_layout` operation for any other tmux layout. + +**Results:** `steps` carries one typed result per operation, discriminated +by `kind`: `capture_pane` returns its `lines`, `split_pane` returns the +new `pane_id`, and the rest return status only. Each step also carries an +`error` message when it fails. Pass `explain` to attach per-dispatch +diagnostics (rendered argv and raw stdout/stderr) under `diagnostics`. + +**Side effects:** Mutates tmux state according to the submitted +operation list. With `on_error="stop"` (the default), the tool stops +before the next operation once one fails or its target cannot be +resolved, and marks the rest `skipped`. With `on_error="continue"`, +every failure is recorded and the rest still run. + +**Destructive operations:** `kill_pane` is destructive. A plan that +contains it runs that operation only when the server's safety tier is +`destructive`; otherwise the operation fails closed with an error and the +rest of the plan is skipped (or recorded, under `on_error="continue"`). + +Set `dry_run` to `true` to compile the operation list and return the +rendered dispatches without touching tmux. Referenced split panes use +deterministic placeholders in `created_panes` until the plan is run for +real. + +`dispatch_timeout` defaults to 10 seconds and bounds how long the tool +waits for each native tmux dispatch. A timed-out dispatch marks the +operation failed with `returncode: null`; because dispatches run in a +worker thread, the underlying tmux work may still finish after the tool +returns. + +Set `rollback_on_error` to `true` to kill panes created by +ref-producing `split_pane` operations when the overall operation list +fails. The result still reports `created_panes`, and adds +`rolled_back_panes` plus `rollback_errors` for cleanup visibility. + +**Example:** + +```json +{ + "tool": "run_tmux_plan", + "arguments": { + "operations": [ + {"kind": "split_pane", "target": {"kind": "pane_id", "pane_id": "%1"}, + "ref": "work"}, + {"kind": "send_keys", "target": {"kind": "ref", "ref": "work"}, + "keys": "uv run pytest"} + ], + "on_error": "stop" + } +} +``` + +```{fastmcp-tool-input} chain_tools.run_tmux_plan +``` diff --git a/docs/tools/index.md b/docs/tools/index.md index 9c37b2f..828619f 100644 --- a/docs/tools/index.md +++ b/docs/tools/index.md @@ -55,9 +55,8 @@ leave socket selection inside each nested tool's arguments. See - Signal a waiter → {tool}`signal-channel` **Batching typed tool calls?** -- Read-only observations → {tool}`call-readonly-tools-batch` -- Ordered readonly + mutating workflows → {tool}`call-mutating-tools-batch` -- Reviewed workflows that include destructive steps → {tool}`call-destructive-tools-batch` +- Native tmux operation chains → {tool}`run-tmux-plan` +- Ordered calls to existing MCP tools → {tool}`call-tools-batch` **Staging multi-line input?** - Stage content → {tool}`load-buffer` @@ -159,10 +158,10 @@ Wait for text to appear in a pane. Get tmux server info. ::: -:::{grid-item-card} call_readonly_tools_batch -:link: call-readonly-tools-batch +:::{grid-item-card} call_tools_batch +:link: call-tools-batch :link-type: ref -Call typed readonly tools in order. +Call existing MCP tools in order, with an optional safety-tier cap. ::: :::{grid-item-card} list_servers @@ -264,10 +263,10 @@ Send several ordered raw-input operations. Run a shell command and report exit status. ::: -:::{grid-item-card} call_mutating_tools_batch -:link: call-mutating-tools-batch +:::{grid-item-card} run_tmux_plan +:link: run-tmux-plan :link-type: ref -Call typed readonly or mutating tools in order. +Run a typed plan of tmux operations, one result per step. ::: :::{grid-item-card} rename_session @@ -435,12 +434,6 @@ Destroy a pane. Kill the entire tmux server. ::: -:::{grid-item-card} call_destructive_tools_batch -:link: call-destructive-tools-batch -:link-type: ref -Call typed tools including destructive steps. -::: - :::{grid-item-card} delete_buffer :link: delete-buffer :link-type: ref @@ -455,6 +448,7 @@ Delete an MCP-staged tmux paste buffer. server/index batch/index +chain/index session/index window/index pane/index diff --git a/docs/topics/architecture.md b/docs/topics/architecture.md index a630b00..f371ed5 100644 --- a/docs/topics/architecture.md +++ b/docs/topics/architecture.md @@ -15,7 +15,7 @@ src/libtmux_mcp/ models.py # Pydantic output models middleware.py # Safety, audit, retry, and error-result middleware tools/ - batch_tools.py # call_readonly_tools_batch, call_mutating_tools_batch, call_destructive_tools_batch + batch_tools.py # call_tools_batch server_tools.py # list_servers, list_sessions, create_session, kill_server, get_server_info session_tools.py # list_windows, create_window, rename_session, kill_session window_tools.py # list_panes, split_window, rename_window, kill_window, select_layout, resize_window diff --git a/pyproject.toml b/pyproject.toml index 094f714..7d44b27 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,6 +111,9 @@ lint = [ requires = ["hatchling"] build-backend = "hatchling.build" +[tool.uv.sources] +libtmux = { git = "https://github.com/tmux-python/libtmux.git", rev = "05f55e2a05fbc746924d9fbacbaf82766f9c0315" } + [tool.uv.exclude-newer-package] # git-pull packages release in lockstep with their workspaces, so a # fresh release blocking on the 3-day cooldown blocks every diff --git a/src/libtmux_mcp/_utils.py b/src/libtmux_mcp/_utils.py index f6a239b..0cf035f 100644 --- a/src/libtmux_mcp/_utils.py +++ b/src/libtmux_mcp/_utils.py @@ -341,6 +341,26 @@ def _caller_is_strictly_on_server( VALID_SAFETY_LEVELS = frozenset({TAG_READONLY, TAG_MUTATING, TAG_DESTRUCTIVE}) + +def _resolve_safety_level(value: str | None) -> str: + """Return the effective safety level for a ``LIBTMUX_SAFETY`` value.""" + if value is None: + return TAG_MUTATING + if value in VALID_SAFETY_LEVELS: + return value + logger.warning( + "invalid LIBTMUX_SAFETY=%r, falling back to %s", + value, + TAG_READONLY, + ) + return TAG_READONLY + + +def effective_safety_level() -> str: + """Resolve the current process safety level from the environment.""" + return _resolve_safety_level(os.environ.get("LIBTMUX_SAFETY")) + + # --------------------------------------------------------------------------- # Reusable annotation presets for tool registration # --------------------------------------------------------------------------- diff --git a/src/libtmux_mcp/models.py b/src/libtmux_mcp/models.py index fc97f71..31bd64c 100644 --- a/src/libtmux_mcp/models.py +++ b/src/libtmux_mcp/models.py @@ -2,9 +2,10 @@ from __future__ import annotations +import enum import typing as t -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, model_validator class SessionInfo(BaseModel): @@ -655,3 +656,361 @@ class ContentChangeResult(BaseModel): changed: bool = Field(description="Whether the content changed before timeout") pane_id: str = Field(description="Pane ID that was polled") elapsed_seconds: float = Field(description="Time spent waiting in seconds") + + +class TmuxOperationStatus(str, enum.Enum): + """Execution status for one typed tmux operation.""" + + SUCCEEDED = "succeeded" + FAILED = "failed" + SKIPPED = "skipped" + PLANNED = "planned" + + +class PaneIdTarget(BaseModel): + """Target a concrete pane by its tmux ID.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["pane_id"] = Field( + default="pane_id", + description="Target discriminator.", + ) + pane_id: str = Field(description="Concrete tmux pane ID, e.g. '%1'.") + + +class RefTarget(BaseModel): + """Target a pane created earlier in the same operation list.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["ref"] = Field( + default="ref", + description="Target discriminator.", + ) + ref: str = Field( + description="Reference name captured from an earlier split_pane operation.", + ) + + +PaneTarget: t.TypeAlias = t.Annotated[ + PaneIdTarget | RefTarget, + Field(discriminator="kind"), +] + + +class SplitPaneOperation(BaseModel): + """Split a pane and optionally expose the new pane under ``ref``.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["split_pane"] = Field( + default="split_pane", + description="Operation discriminator.", + ) + target: PaneTarget = Field(description="Pane to split.") + ref: str | None = Field( + default=None, + description="Reference name for the created pane ID.", + ) + horizontal: bool = Field( + default=False, + description="Split left/right (-h) instead of top/bottom.", + ) + shell: str | None = Field( + default=None, + description="Command to run in the new pane instead of the default shell.", + ) + + +class TmuxSendKeysOperation(BaseModel): + """Send keys to a pane target.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["send_keys"] = Field( + default="send_keys", + description="Operation discriminator.", + ) + target: PaneTarget = Field(description="Pane to send keys to.") + keys: str = Field(description="Keys or text to send.") + enter: bool = Field(default=True, description="Press Enter after sending keys.") + literal: bool = Field( + default=False, + description="Pass -l so tmux sends keys literally.", + ) + suppress_history: bool = Field( + default=False, + description=( + "Prepend a space so the shell ignores the command in history, " + "where the shell honors space-prefixed commands." + ), + ) + + +class ResizePaneOperation(BaseModel): + """Resize a pane by dimensions or zoom toggle.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["resize_pane"] = Field( + default="resize_pane", + description="Operation discriminator.", + ) + target: PaneTarget = Field(description="Pane to resize.") + height: int | None = Field(default=None, description="New height in lines.") + width: int | None = Field(default=None, description="New width in columns.") + zoom: bool | None = Field(default=None, description="Toggle pane zoom.") + + @model_validator(mode="after") + def _validate_resize(self) -> ResizePaneOperation: + if self.zoom is not None and ( + self.height is not None or self.width is not None + ): + msg = "Cannot combine zoom with height/width." + raise ValueError(msg) + if self.zoom is None and self.height is None and self.width is None: + msg = "Provide height, width, or zoom." + raise ValueError(msg) + return self + + +class SelectLayoutOperation(BaseModel): + """Select a layout for a tmux window.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["select_layout"] = Field( + default="select_layout", + description="Operation discriminator.", + ) + window_id: str = Field(description="Concrete tmux window ID, e.g. '@1'.") + layout: str = Field(description="Layout name or custom layout string.") + + +class SetOptionOperation(BaseModel): + """Set a tmux option at server, session, window, or pane scope.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["set_option"] = Field( + default="set_option", + description="Operation discriminator.", + ) + option: str = Field(description="Option name to set.") + value: str = Field(description="Option value.") + scope: t.Literal["server", "session", "window", "pane"] | None = Field( + default=None, + description="Option scope; omitted means server option.", + ) + target: str | None = Field( + default=None, + description="Target identifier for session, window, or pane scoped options.", + ) + global_: bool = Field(default=False, description="Set the global option table.") + + @model_validator(mode="after") + def _validate_target(self) -> SetOptionOperation: + if self.target is not None and self.scope is None: + msg = "scope is required when target is specified." + raise ValueError(msg) + if self.scope in {"session", "window", "pane"} and self.target is None: + msg = "target is required when scope is 'session', 'window', or 'pane'." + raise ValueError(msg) + return self + + +class CapturePaneOperation(BaseModel): + """Capture pane output as a standalone read operation.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["capture_pane"] = Field( + default="capture_pane", + description="Operation discriminator.", + ) + target: PaneTarget = Field(description="Pane to capture.") + start: int | None = Field(default=None, description="Start capture line.") + end: int | None = Field(default=None, description="End capture line.") + + +class SplitEvenlyOperation(BaseModel): + """Split a pane into an evenly sized row or column of panes.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["split_evenly"] = Field( + default="split_evenly", + description="Operation discriminator.", + ) + target: PaneTarget = Field(description="Pane to split into even panes.") + count: int = Field(description="Total number of resulting panes.", ge=2) + axis: t.Literal["horizontal", "vertical"] = Field( + default="vertical", + description="Lay the panes out side by side (horizontal) or stacked.", + ) + + +class MakeGridOperation(BaseModel): + """Arrange a pane's window into an evenly tiled grid of panes.""" + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["make_grid"] = Field( + default="make_grid", + description="Operation discriminator.", + ) + target: PaneTarget = Field(description="Pane whose window becomes a grid.") + rows: int = Field(description="Grid rows.", ge=1) + cols: int = Field(description="Grid columns.", ge=1) + + @model_validator(mode="after") + def _validate_grid(self) -> MakeGridOperation: + if self.rows * self.cols < 2: + msg = "make_grid must produce at least 2 panes (rows * cols >= 2)." + raise ValueError(msg) + return self + + +class KillPaneOperation(BaseModel): + """Kill a pane. + + Destructive: a plan that contains this operation runs it only when the + server's safety tier is ``destructive``. + """ + + model_config = ConfigDict(extra="forbid") + + kind: t.Literal["kill_pane"] = Field( + default="kill_pane", + description="Operation discriminator.", + ) + target: PaneTarget = Field(description="Pane to kill.") + + +TmuxOperation: t.TypeAlias = t.Annotated[ + SplitPaneOperation + | TmuxSendKeysOperation + | ResizePaneOperation + | SelectLayoutOperation + | SetOptionOperation + | CapturePaneOperation + | SplitEvenlyOperation + | MakeGridOperation + | KillPaneOperation, + Field(discriminator="kind"), +] + + +class SplitPaneStepResult(BaseModel): + """Result for one ``split_pane`` operation.""" + + kind: t.Literal["split_pane"] = Field( + default="split_pane", + description="Operation kind discriminator.", + ) + index: int = Field(description="Zero-based operation index.") + status: TmuxOperationStatus = Field(description="Execution status.") + pane_id: str | None = Field( + default=None, + description="Concrete pane ID created by a ref-producing split, if any.", + ) + error: str | None = Field( + default=None, + description="Failure message when the operation failed.", + ) + + +class CapturePaneStepResult(BaseModel): + """Result for one ``capture_pane`` operation.""" + + kind: t.Literal["capture_pane"] = Field( + default="capture_pane", + description="Operation kind discriminator.", + ) + index: int = Field(description="Zero-based operation index.") + status: TmuxOperationStatus = Field(description="Execution status.") + lines: list[str] | None = Field( + default=None, + description="Captured pane lines on success.", + ) + error: str | None = Field( + default=None, + description="Failure message when the operation failed.", + ) + + +class OperationStepResult(BaseModel): + """Result for an operation that returns status only.""" + + kind: t.Literal[ + "send_keys", + "resize_pane", + "select_layout", + "set_option", + "split_evenly", + "make_grid", + "kill_pane", + ] = Field( + description="Operation kind discriminator.", + ) + index: int = Field(description="Zero-based operation index.") + status: TmuxOperationStatus = Field(description="Execution status.") + error: str | None = Field( + default=None, + description="Failure message when the operation failed.", + ) + + +TmuxStepResult: t.TypeAlias = t.Annotated[ + SplitPaneStepResult | CapturePaneStepResult | OperationStepResult, + Field(discriminator="kind"), +] + + +class TmuxOperationDispatchResult(BaseModel): + """Diagnostics for one native tmux dispatch.""" + + index: int = Field(description="Operation index this dispatch ran.") + argv: list[str] = Field(description="Rendered tmux argv.") + returncode: int | None = Field(description="tmux process exit code, if run.") + stdout: list[str] = Field(default_factory=list, description="stdout lines.") + stderr: list[str] = Field(default_factory=list, description="stderr lines.") + + +class RunTmuxDiagnostics(BaseModel): + """Dispatch diagnostics returned only when ``explain`` is set.""" + + dispatch_count: int = Field(description="Number of native tmux dispatches.") + dispatches: list[TmuxOperationDispatchResult] = Field( + description="Native tmux dispatches used to run the operations.", + ) + + +class RunTmuxPlanResult(BaseModel): + """Result of running typed tmux operations.""" + + succeeded: bool = Field(description="False when any operation failed or skipped.") + dry_run: bool = Field( + default=False, + description="True when dispatches were planned but not executed.", + ) + steps: list[TmuxStepResult] = Field( + description="Per-operation results in input order.", + ) + created_panes: dict[str, str] = Field( + default_factory=dict, + description="Mapping of split_pane ref names to concrete pane IDs.", + ) + rolled_back_panes: list[str] = Field( + default_factory=list, + description="Pane IDs killed by rollback_on_error.", + ) + rollback_errors: list[str] = Field( + default_factory=list, + description="Errors raised while rolling back created panes.", + ) + diagnostics: RunTmuxDiagnostics | None = Field( + default=None, + description="Dispatch diagnostics, present only when explain is set.", + ) diff --git a/src/libtmux_mcp/server.py b/src/libtmux_mcp/server.py index 849bb26..2c0895a 100644 --- a/src/libtmux_mcp/server.py +++ b/src/libtmux_mcp/server.py @@ -22,7 +22,7 @@ TAG_DESTRUCTIVE, TAG_MUTATING, TAG_READONLY, - VALID_SAFETY_LEVELS, + _resolve_safety_level, _server_cache, ) from libtmux_mcp.middleware import ( @@ -190,20 +190,6 @@ def _build_instructions(safety_level: str = TAG_MUTATING) -> str: return "".join(parts) -def _resolve_safety_level(value: str | None) -> str: - """Return the effective safety level for a ``LIBTMUX_SAFETY`` value.""" - if value is None: - return TAG_MUTATING - if value in VALID_SAFETY_LEVELS: - return value - logger.warning( - "invalid LIBTMUX_SAFETY=%r, falling back to %s", - value, - TAG_READONLY, - ) - return TAG_READONLY - - _safety_level = _resolve_safety_level(os.environ.get("LIBTMUX_SAFETY")) #: Tools covered by the tail-preserving response limiter. Only tools diff --git a/src/libtmux_mcp/tools/__init__.py b/src/libtmux_mcp/tools/__init__.py index 7a72f9a..62b6d48 100644 --- a/src/libtmux_mcp/tools/__init__.py +++ b/src/libtmux_mcp/tools/__init__.py @@ -13,6 +13,7 @@ def register_tools(mcp: FastMCP) -> None: from libtmux_mcp.tools import ( batch_tools, buffer_tools, + chain_tools, env_tools, hook_tools, option_tools, @@ -24,6 +25,7 @@ def register_tools(mcp: FastMCP) -> None: ) batch_tools.register(mcp) + chain_tools.register(mcp) server_tools.register(mcp) session_tools.register(mcp) window_tools.register(mcp) diff --git a/src/libtmux_mcp/tools/batch_tools.py b/src/libtmux_mcp/tools/batch_tools.py index 65d760b..e299897 100644 --- a/src/libtmux_mcp/tools/batch_tools.py +++ b/src/libtmux_mcp/tools/batch_tools.py @@ -11,7 +11,6 @@ from pydantic import BaseModel from libtmux_mcp._utils import ( - ANNOTATIONS_RO, TAG_DESTRUCTIVE, TAG_MUTATING, TAG_READONLY, @@ -36,13 +35,7 @@ TAG_DESTRUCTIVE: 2, } -_BATCH_TOOL_NAMES: frozenset[str] = frozenset( - { - "call_readonly_tools_batch", - "call_mutating_tools_batch", - "call_destructive_tools_batch", - } -) +_BATCH_TOOL_NAMES: frozenset[str] = frozenset({"call_tools_batch"}) MAX_BATCH_OPERATIONS = 1_000 @@ -285,81 +278,40 @@ async def _call_tools_batch( @handle_tool_errors_async -async def call_readonly_tools_batch( +async def call_tools_batch( operations: list[ToolCallOperation], on_error: _OnError = "stop", + max_tier: t.Literal["readonly", "mutating", "destructive"] | None = None, ctx: Context | None = None, ) -> ToolCallBatchResult: - """Call readonly MCP tools serially and return per-tool results. - - Use when several read-only observations should be made in one agent - turn. Each nested call still goes through FastMCP validation, - middleware, and safety checks. Mutating and destructive tools are - rejected even if the server process itself is running at a higher - safety tier. + """Call existing MCP tools serially and return per-tool results. + + Use for ordered tmux workflows where every step is an existing typed MCP + tool. Each nested call still goes through FastMCP validation, middleware, + and the server's safety tier, so the batch can never run a nested tool the + server tier hides. ``max_tier`` optionally caps the batch below the server + tier: ``"readonly"`` refuses any mutating or destructive nested call and + ``"mutating"`` refuses destructive ones. The default permits every tier the + server already allows. Prefer the typed run_tmux_plan tool for tmux + operations; reach for this batch only to drive arbitrary registered tools. """ return await _call_tools_batch( operations=operations, on_error=on_error, - max_tier=TAG_READONLY, + max_tier=max_tier if max_tier is not None else TAG_DESTRUCTIVE, ctx=ctx, ) -@handle_tool_errors_async -async def call_mutating_tools_batch( - operations: list[ToolCallOperation], - on_error: _OnError = "stop", - ctx: Context | None = None, -) -> ToolCallBatchResult: - """Call readonly or mutating MCP tools serially and return per-tool results. - - Use for ordered tmux workflows where every step is still an existing - typed MCP tool. Destructive tools are rejected regardless of the - process-wide safety tier. - """ - return await _call_tools_batch( - operations=operations, - on_error=on_error, - max_tier=TAG_MUTATING, - ctx=ctx, - ) - - -@handle_tool_errors_async -async def call_destructive_tools_batch( - operations: list[ToolCallOperation], - on_error: _OnError = "stop", - ctx: Context | None = None, -) -> ToolCallBatchResult: - """Call readonly, mutating, or destructive MCP tools serially. +def register(mcp: FastMCP) -> None: + """Register the generic MCP batch tool. - This wrapper preserves the normal per-tool schemas and middleware - but its tier permits destructive nested operations. Prefer the - narrower readonly or mutating wrappers whenever possible. + Tagged ``readonly`` so it stays callable at every safety tier; each nested + call is still re-checked against the server tier by the safety middleware, + so visibility never widens what the batch can actually run. """ - return await _call_tools_batch( - operations=operations, - on_error=on_error, - max_tier=TAG_DESTRUCTIVE, - ctx=ctx, - ) - - -def register(mcp: FastMCP) -> None: - """Register generic MCP batch tools.""" mcp.tool( - title="Call Readonly Tools Batch", - annotations=ANNOTATIONS_RO, - tags={TAG_READONLY}, - )(call_readonly_tools_batch) - mcp.tool( - title="Call Mutating Tools Batch", - annotations=_ANNOTATIONS_BATCH_SIDE_EFFECTS, - tags={TAG_MUTATING}, - )(call_mutating_tools_batch) - mcp.tool( - title="Call Destructive Tools Batch", + title="Call Tools Batch", annotations=_ANNOTATIONS_BATCH_SIDE_EFFECTS, - tags={TAG_DESTRUCTIVE}, - )(call_destructive_tools_batch) + tags={TAG_READONLY}, + )(call_tools_batch) diff --git a/src/libtmux_mcp/tools/chain_tools.py b/src/libtmux_mcp/tools/chain_tools.py new file mode 100644 index 0000000..c043911 --- /dev/null +++ b/src/libtmux_mcp/tools/chain_tools.py @@ -0,0 +1,710 @@ +"""Typed MCP tool for running tmux operations over a control connection.""" + +from __future__ import annotations + +import asyncio +import dataclasses +import typing as t + +from libtmux._experimental.chain import ( + CommandCall, + CommandChain, + CommandResultLike, + CommandScope, + CommandScopeError, + ControlModeRunner, + validate_command_scope, +) +from pydantic import TypeAdapter + +from libtmux_mcp._utils import ( + ANNOTATIONS_SHELL, + TAG_DESTRUCTIVE, + TAG_MUTATING, + ExpectedToolError, + _get_server, + effective_safety_level, + handle_tool_errors_async, +) +from libtmux_mcp.models import ( + CapturePaneOperation, + CapturePaneStepResult, + KillPaneOperation, + MakeGridOperation, + OperationStepResult, + PaneIdTarget, + PaneTarget, + RefTarget, + ResizePaneOperation, + RunTmuxDiagnostics, + RunTmuxPlanResult, + SelectLayoutOperation, + SetOptionOperation, + SplitEvenlyOperation, + SplitPaneOperation, + SplitPaneStepResult, + TmuxOperation, + TmuxOperationDispatchResult, + TmuxOperationStatus, + TmuxSendKeysOperation, + TmuxStepResult, +) + +if t.TYPE_CHECKING: + from fastmcp import FastMCP + from typing_extensions import assert_never +else: + + def assert_never(value: object) -> t.NoReturn: + """Runtime fallback for the type-checker-only exhaustiveness helper.""" + msg = f"unhandled operation: {value!r}" + raise AssertionError(msg) + + +TMUX_OPERATIONS_ADAPTER: TypeAdapter[list[TmuxOperation]] = TypeAdapter( + list[TmuxOperation], +) + + +class _CompileError(Exception): + """Operation-level compile failure that should become a step result.""" + + +@dataclasses.dataclass +class _Outcome: + """Internal per-operation outcome before shaping into a typed result.""" + + index: int + kind: str + status: TmuxOperationStatus + stdout: list[str] = dataclasses.field(default_factory=list) + stderr: list[str] = dataclasses.field(default_factory=list) + created_pane_id: str | None = None + + +@dataclasses.dataclass +class _CombinedResult: + """A ``CommandResultLike`` merging several control-mode command results.""" + + stdout: list[str] + stderr: list[str] + returncode: int + + +def _combine_results( + results: t.Sequence[CommandResultLike], +) -> _CombinedResult: + """Merge per-command results; the first non-zero return code wins.""" + stdout = [line for result in results for line in result.stdout] + stderr = [line for result in results for line in result.stderr] + returncode = next( + (result.returncode for result in results if result.returncode != 0), + 0, + ) + return _CombinedResult(stdout=stdout, stderr=stderr, returncode=returncode) + + +_FIXED_COMMAND_SCOPE: dict[str, CommandScope] = { + "split-window": "pane", + "send-keys": "pane", + "resize-pane": "pane", + "capture-pane": "pane", + "select-layout": "window", + "kill-pane": "pane", +} + + +def _call_scope(operation: TmuxOperation, call: CommandCall) -> CommandScope: + """Return the tmux target scope for one command of an operation.""" + if isinstance(operation, SetOptionOperation): + return operation.scope if operation.scope is not None else "server" + return _FIXED_COMMAND_SCOPE[call.name] + + +def _validate_operation_scope( + operation: TmuxOperation, + calls: tuple[CommandCall, ...], +) -> None: + """Validate each command's target scope against libtmux command metadata.""" + try: + for call in calls: + validate_command_scope(call.name, _call_scope(operation, call)) + except CommandScopeError as exc: + raise _CompileError(str(exc)) from exc + + +def _resolve_target( + target: PaneTarget, + created_panes: dict[str, str], +) -> str: + """Resolve a typed pane target to a concrete tmux target token.""" + if isinstance(target, PaneIdTarget): + return target.pane_id + if isinstance(target, RefTarget): + try: + return created_panes[target.ref] + except KeyError as exc: + msg = f"unknown ref: {target.ref}" + raise _CompileError(msg) from exc + assert_never(target) + + +def _split_calls( + operation: SplitPaneOperation, + created_panes: dict[str, str], +) -> tuple[CommandCall, ...]: + """Build ``split-window`` calls for a typed split operation.""" + args: list[str] = [] + if operation.horizontal: + args.append("-h") + if operation.ref is not None: + args.extend(("-P", "-F", "#{pane_id}")) + # Pin the new pane to the target pane's directory. Without ``-c`` tmux + # resolves the cwd from the control client's context rather than the target + # pane, so an explicit format keeps splits deterministic. + args.extend(("-c", "#{pane_current_path}")) + if operation.shell is not None: + args.append(operation.shell) + return ( + CommandCall( + "split-window", + tuple(args), + target=_resolve_target(operation.target, created_panes), + ), + ) + + +def _send_keys_calls( + operation: TmuxSendKeysOperation, + created_panes: dict[str, str], +) -> tuple[CommandCall, ...]: + """Build one operation's ``send-keys`` calls.""" + target = _resolve_target(operation.target, created_panes) + keys = (" " if operation.suppress_history else "") + operation.keys + if operation.literal: + calls = [ + CommandCall("send-keys", ("-l", keys), target=target), + ] + if operation.enter: + calls.append(CommandCall("send-keys", ("Enter",), target=target)) + return tuple(calls) + + args: list[str] = [keys] + if operation.enter: + args.append("Enter") + return (CommandCall("send-keys", tuple(args), target=target),) + + +def _resize_pane_calls( + operation: ResizePaneOperation, + created_panes: dict[str, str], +) -> tuple[CommandCall, ...]: + """Build ``resize-pane`` calls for a typed resize operation.""" + args: list[str | int] = [] + if operation.zoom: + args.append("-Z") + if operation.height is not None: + args.extend(("-y", operation.height)) + if operation.width is not None: + args.extend(("-x", operation.width)) + return ( + CommandCall( + "resize-pane", + tuple(args), + target=_resolve_target(operation.target, created_panes), + ), + ) + + +def _select_layout_calls(operation: SelectLayoutOperation) -> tuple[CommandCall, ...]: + """Build ``select-layout`` calls for a typed layout operation.""" + return ( + CommandCall("select-layout", (operation.layout,), target=operation.window_id), + ) + + +def _set_option_calls(operation: SetOptionOperation) -> tuple[CommandCall, ...]: + """Build ``set-option`` calls for a typed option operation.""" + args: list[str] = [] + if operation.global_: + args.append("-g") + if operation.scope == "server": + args.append("-s") + elif operation.scope == "window": + args.append("-w") + elif operation.scope == "pane": + args.append("-p") + args.extend((operation.option, operation.value)) + return (CommandCall("set-option", tuple(args), target=operation.target),) + + +def _capture_pane_calls( + operation: CapturePaneOperation, + created_panes: dict[str, str], +) -> tuple[CommandCall, ...]: + """Build ``capture-pane`` calls for a typed capture operation.""" + args: list[str | int] = ["-p"] + if operation.start is not None: + args.extend(("-S", operation.start)) + if operation.end is not None: + args.extend(("-E", operation.end)) + return ( + CommandCall( + "capture-pane", + tuple(args), + target=_resolve_target(operation.target, created_panes), + ), + ) + + +def _split_evenly_calls( + operation: SplitEvenlyOperation, + created_panes: dict[str, str], +) -> tuple[CommandCall, ...]: + """Build splits plus an even layout for a typed split-evenly operation.""" + target = _resolve_target(operation.target, created_panes) + flag = "-h" if operation.axis == "horizontal" else "-v" + layout = "even-horizontal" if operation.axis == "horizontal" else "even-vertical" + calls = [ + CommandCall( + "split-window", + (flag, "-c", "#{pane_current_path}"), + target=target, + ) + for _ in range(operation.count - 1) + ] + calls.append(CommandCall("select-layout", (layout,), target=target)) + return tuple(calls) + + +def _make_grid_calls( + operation: MakeGridOperation, + created_panes: dict[str, str], +) -> tuple[CommandCall, ...]: + """Build splits plus a tiled layout for a typed make-grid operation.""" + target = _resolve_target(operation.target, created_panes) + panes = operation.rows * operation.cols + calls = [ + CommandCall( + "split-window", + ("-c", "#{pane_current_path}"), + target=target, + ) + for _ in range(panes - 1) + ] + calls.append(CommandCall("select-layout", ("tiled",), target=target)) + return tuple(calls) + + +def _kill_pane_calls( + operation: KillPaneOperation, + created_panes: dict[str, str], +) -> tuple[CommandCall, ...]: + """Build ``kill-pane`` calls for a typed kill operation.""" + return ( + CommandCall( + "kill-pane", + (), + target=_resolve_target(operation.target, created_panes), + ), + ) + + +def _operation_calls( + operation: TmuxOperation, + created_panes: dict[str, str], +) -> tuple[CommandCall, ...]: + """Lower one typed operation to tmux command calls.""" + if ( + isinstance(operation, KillPaneOperation) + and effective_safety_level() != TAG_DESTRUCTIVE + ): + msg = "kill_pane requires the destructive safety tier" + raise _CompileError(msg) + if isinstance(operation, SplitPaneOperation): + calls = _split_calls(operation, created_panes) + elif isinstance(operation, TmuxSendKeysOperation): + calls = _send_keys_calls(operation, created_panes) + elif isinstance(operation, ResizePaneOperation): + calls = _resize_pane_calls(operation, created_panes) + elif isinstance(operation, SelectLayoutOperation): + calls = _select_layout_calls(operation) + elif isinstance(operation, SetOptionOperation): + calls = _set_option_calls(operation) + elif isinstance(operation, CapturePaneOperation): + calls = _capture_pane_calls(operation, created_panes) + elif isinstance(operation, SplitEvenlyOperation): + calls = _split_evenly_calls(operation, created_panes) + elif isinstance(operation, MakeGridOperation): + calls = _make_grid_calls(operation, created_panes) + elif isinstance(operation, KillPaneOperation): + calls = _kill_pane_calls(operation, created_panes) + else: + assert_never(operation) + _validate_operation_scope(operation, calls) + return calls + + +def _calls_argv(calls: tuple[CommandCall, ...]) -> list[str]: + """Render an operation's calls for the dispatch record.""" + if len(calls) == 1: + return list(calls[0].argv()) + return list(CommandChain(calls).argv()) + + +def _run_calls( + runner: ControlModeRunner, + calls: tuple[CommandCall, ...], +) -> tuple[list[str], CommandResultLike]: + """Run one operation's calls over the control connection.""" + results = runner.run_calls(calls) + return _calls_argv(calls), _combine_results(results) + + +def _dispatch_standalone( + runner: ControlModeRunner, + index: int, + kind: str, + calls: tuple[CommandCall, ...], + *, + capture_created_pane: bool, +) -> tuple[TmuxOperationDispatchResult, _Outcome, str | None]: + """Run one operation and return dispatch, outcome, and captured pane id.""" + argv, result = _run_calls(runner, calls) + stdout = list(result.stdout) + stderr = list(result.stderr) + created_pane_id: str | None = None + status = TmuxOperationStatus.SUCCEEDED + if result.returncode != 0: + status = TmuxOperationStatus.FAILED + elif capture_created_pane: + if stdout: + created_pane_id = stdout[0] + else: + status = TmuxOperationStatus.FAILED + stderr = [*stderr, "split-pane did not return a pane id"] + + return ( + TmuxOperationDispatchResult( + index=index, + argv=argv, + returncode=result.returncode, + stdout=stdout, + stderr=stderr, + ), + _Outcome( + index=index, + kind=kind, + status=status, + stdout=stdout, + stderr=stderr, + created_pane_id=created_pane_id, + ), + created_pane_id, + ) + + +def _planned_pane_ref(ref: str) -> str: + """Return the deterministic placeholder for a dry-run pane ref.""" + return f"" + + +def _plan_standalone( + index: int, + kind: str, + calls: tuple[CommandCall, ...], + *, + created_pane_id: str | None = None, +) -> tuple[TmuxOperationDispatchResult, _Outcome, str | None]: + """Return the dry-run shape for one operation dispatch.""" + return ( + TmuxOperationDispatchResult( + index=index, + argv=_calls_argv(calls), + returncode=None, + ), + _Outcome( + index=index, + kind=kind, + status=TmuxOperationStatus.PLANNED, + created_pane_id=created_pane_id, + ), + created_pane_id, + ) + + +def _timeout_stderr(dispatch_timeout: float) -> list[str]: + """Return the stderr payload for a bounded dispatch timeout.""" + return [f"tmux dispatch timed out after {dispatch_timeout:g} seconds"] + + +def _timeout_standalone( + index: int, + kind: str, + calls: tuple[CommandCall, ...], + dispatch_timeout: float, +) -> tuple[TmuxOperationDispatchResult, _Outcome, str | None]: + """Return timeout results for one operation dispatch.""" + stderr = _timeout_stderr(dispatch_timeout) + return ( + TmuxOperationDispatchResult( + index=index, + argv=_calls_argv(calls), + returncode=None, + stderr=stderr, + ), + _Outcome( + index=index, + kind=kind, + status=TmuxOperationStatus.FAILED, + stderr=stderr, + ), + None, + ) + + +def _rollback_created_panes( + runner: ControlModeRunner, + pane_ids: list[str], +) -> tuple[list[str], list[str]]: + """Kill created panes in reverse order and report cleanup failures.""" + rolled_back_panes: list[str] = [] + rollback_errors: list[str] = [] + for pane_id in reversed(pane_ids): + result = runner.cmd("kill-pane", "-t", pane_id) + if result.returncode == 0: + rolled_back_panes.append(pane_id) + continue + stderr = list(result.stderr) or [f"kill-pane exited {result.returncode}"] + rollback_errors.extend(f"{pane_id}: {line}" for line in stderr) + return rolled_back_panes, rollback_errors + + +def _compile_failure_outcome( + index: int, + operation: TmuxOperation, + error: Exception, +) -> _Outcome: + """Convert a compile failure into an outcome.""" + return _Outcome( + index=index, + kind=operation.kind, + status=TmuxOperationStatus.FAILED, + stderr=[str(error)], + ) + + +def _skipped_outcome(index: int, operation: TmuxOperation) -> _Outcome: + """Return a skipped outcome for an operation after stop-on-error.""" + return _Outcome( + index=index, + kind=operation.kind, + status=TmuxOperationStatus.SKIPPED, + ) + + +def _outcome_succeeded(outcome: _Outcome, *, dry_run: bool) -> bool: + """Return whether an outcome should allow later operations to continue.""" + return outcome.status == TmuxOperationStatus.SUCCEEDED or ( + dry_run and outcome.status == TmuxOperationStatus.PLANNED + ) + + +def _to_step_result(outcome: _Outcome) -> TmuxStepResult: + """Shape an internal outcome into the typed, per-kind step result.""" + error = "\n".join(outcome.stderr) if outcome.stderr else None + if outcome.kind == "split_pane": + return SplitPaneStepResult( + index=outcome.index, + status=outcome.status, + pane_id=outcome.created_pane_id, + error=error, + ) + if outcome.kind == "capture_pane": + lines = ( + outcome.stdout if outcome.status == TmuxOperationStatus.SUCCEEDED else None + ) + return CapturePaneStepResult( + index=outcome.index, + status=outcome.status, + lines=lines, + error=error, + ) + status_kind = t.cast( + "t.Literal['send_keys', 'resize_pane', 'select_layout', 'set_option', " + "'split_evenly', 'make_grid', 'kill_pane']", + outcome.kind, + ) + return OperationStepResult( + kind=status_kind, + index=outcome.index, + status=outcome.status, + error=error, + ) + + +@handle_tool_errors_async +async def run_tmux_plan( + operations: list[TmuxOperation], + on_error: t.Literal["stop", "continue"] = "stop", + dry_run: bool = False, + dispatch_timeout: float | None = 10.0, + rollback_on_error: bool = False, + explain: bool = False, + socket_name: str | None = None, +) -> RunTmuxPlanResult: + """Run typed tmux operations, one dispatch per operation. + + Each operation is dispatched on its own over a persistent ``tmux -C`` + control connection, so every operation keeps its own stdout and return + code. The result carries one typed, per-kind ``steps`` entry per + operation: ``capture_pane`` returns ``lines``, ``split_pane`` returns + ``pane_id``, and the rest return status only. + + ``on_error="stop"`` (the default) stops before the next operation once one + fails or its target cannot be resolved, marking the rest as skipped; + ``on_error="continue"`` records each failure and runs the rest. + ``dry_run`` returns the planned steps without touching tmux. + ``dispatch_timeout`` bounds how long the tool waits for one native tmux + dispatch; timed-out work may still finish in the background. + ``rollback_on_error`` kills panes created by ref-producing ``split_pane`` + operations when the overall operation list fails. + ``explain`` attaches per-dispatch diagnostics (rendered argv and raw + stdout/stderr) under ``diagnostics``. + """ + validated = TMUX_OPERATIONS_ADAPTER.validate_python(operations) + if not validated: + msg = "operations must not be empty" + raise ExpectedToolError(msg) + if on_error not in {"stop", "continue"}: + msg = "on_error must be 'stop' or 'continue'" + raise ExpectedToolError(msg) + if dispatch_timeout is not None and dispatch_timeout <= 0: + msg = "dispatch_timeout must be greater than 0 or null" + raise ExpectedToolError(msg) + + runner: ControlModeRunner | None = None + if not dry_run: + runner = ControlModeRunner(_get_server(socket_name=socket_name)) + try: + dispatches: list[TmuxOperationDispatchResult] = [] + outcomes_by_index: dict[int, _Outcome] = {} + created_panes: dict[str, str] = {} + created_pane_order: list[str] = [] + + def record_created_pane(ref: str, pane_id: str) -> None: + created_panes[ref] = pane_id + if pane_id not in created_pane_order: + created_pane_order.append(pane_id) + + def skip_rest(start: int) -> None: + for skip_index, skipped in enumerate(validated[start:], start=start): + outcomes_by_index[skip_index] = _skipped_outcome(skip_index, skipped) + + index = 0 + while index < len(validated): + operation = validated[index] + try: + calls = _operation_calls(operation, created_panes) + except _CompileError as exc: + outcomes_by_index[index] = _compile_failure_outcome( + index, operation, exc + ) + if on_error == "stop": + skip_rest(index + 1) + break + index += 1 + continue + + capture_created_pane = ( + isinstance(operation, SplitPaneOperation) and operation.ref is not None + ) + if dry_run: + planned_pane_id = ( + _planned_pane_ref(operation.ref) + if isinstance(operation, SplitPaneOperation) + and operation.ref is not None + else None + ) + dispatch, outcome, created_pane_id = _plan_standalone( + index, + operation.kind, + calls, + created_pane_id=planned_pane_id, + ) + else: + assert runner is not None + try: + dispatch_coro = asyncio.to_thread( + _dispatch_standalone, + runner, + index, + operation.kind, + calls, + capture_created_pane=capture_created_pane, + ) + if dispatch_timeout is None: + dispatch, outcome, created_pane_id = await dispatch_coro + else: + dispatch, outcome, created_pane_id = await asyncio.wait_for( + dispatch_coro, + timeout=dispatch_timeout, + ) + except TimeoutError: + assert dispatch_timeout is not None + dispatch, outcome, created_pane_id = _timeout_standalone( + index, + operation.kind, + calls, + dispatch_timeout, + ) + dispatches.append(dispatch) + outcomes_by_index[index] = outcome + if capture_created_pane and created_pane_id is not None: + assert isinstance(operation, SplitPaneOperation) + assert operation.ref is not None + record_created_pane(operation.ref, created_pane_id) + if not _outcome_succeeded(outcome, dry_run=dry_run) and on_error == "stop": + skip_rest(index + 1) + break + index += 1 + + outcomes = [outcomes_by_index[index] for index in range(len(validated))] + succeeded = all( + _outcome_succeeded(outcome, dry_run=dry_run) for outcome in outcomes + ) + rolled_back_panes: list[str] = [] + rollback_errors: list[str] = [] + if rollback_on_error and not dry_run and not succeeded and created_pane_order: + assert runner is not None + rolled_back_panes, rollback_errors = await asyncio.to_thread( + _rollback_created_panes, + runner, + created_pane_order, + ) + diagnostics = ( + RunTmuxDiagnostics(dispatch_count=len(dispatches), dispatches=dispatches) + if explain + else None + ) + return RunTmuxPlanResult( + succeeded=succeeded, + dry_run=dry_run, + steps=[_to_step_result(outcome) for outcome in outcomes], + created_panes=created_panes, + rolled_back_panes=rolled_back_panes, + rollback_errors=rollback_errors, + diagnostics=diagnostics, + ) + finally: + if runner is not None: + await asyncio.to_thread(runner.close) + + +def register(mcp: FastMCP) -> None: + """Register typed chain tools with the MCP instance.""" + mcp.tool( + title="Run tmux Plan", + annotations=ANNOTATIONS_SHELL, + tags={TAG_MUTATING}, + )(run_tmux_plan) diff --git a/src/libtmux_mcp/tools/server_tools.py b/src/libtmux_mcp/tools/server_tools.py index 9b0b9a2..ed1f1f9 100644 --- a/src/libtmux_mcp/tools/server_tools.py +++ b/src/libtmux_mcp/tools/server_tools.py @@ -272,9 +272,7 @@ def _probe_server_by_path(socket_path: pathlib.Path) -> ServerInfo | None: #: ``_BASE_INSTRUCTIONS`` so the two stay in lockstep. SOCKET_NAME_EXEMPT: frozenset[str] = frozenset( { - "call_destructive_tools_batch", - "call_mutating_tools_batch", - "call_readonly_tools_batch", + "call_tools_batch", "list_servers", } ) diff --git a/tests/test_batch_tools.py b/tests/test_batch_tools.py index 96d3115..94b4d2a 100644 --- a/tests/test_batch_tools.py +++ b/tests/test_batch_tools.py @@ -51,37 +51,6 @@ class BatchOperationLimitFixture(t.NamedTuple): ] -class BatchAnnotationFixture(t.NamedTuple): - """Test fixture for generic batch wrapper annotations.""" - - test_id: str - tool_name: str - read_only_hint: bool - destructive_hint: bool - idempotent_hint: bool - open_world_hint: bool - - -BATCH_ANNOTATION_FIXTURES: list[BatchAnnotationFixture] = [ - BatchAnnotationFixture( - test_id="mutating_batch_warns_destructive_open_world", - tool_name="call_mutating_tools_batch", - read_only_hint=False, - destructive_hint=True, - idempotent_hint=False, - open_world_hint=True, - ), - BatchAnnotationFixture( - test_id="destructive_batch_warns_destructive_open_world", - tool_name="call_destructive_tools_batch", - read_only_hint=False, - destructive_hint=True, - idempotent_hint=False, - open_world_hint=True, - ), -] - - def _content_block_to_wire(block: t.Any) -> dict[str, t.Any]: if hasattr(block, "model_dump"): dumped = block.model_dump(mode="json", by_alias=True, exclude_none=True) @@ -98,8 +67,8 @@ def _call_tool_result_wire(result: t.Any) -> dict[str, t.Any]: } -def _batch_probe_server() -> FastMCP: - """Build a small FastMCP server with batch tools and tiered probes.""" +def _batch_probe_server(server_tier: str = TAG_DESTRUCTIVE) -> FastMCP: + """Build a small FastMCP server with the batch tool and tiered probes.""" from fastmcp import FastMCP from libtmux_mcp.middleware import SafetyMiddleware, ToolErrorResultMiddleware @@ -109,7 +78,7 @@ def _batch_probe_server() -> FastMCP: name="batch-probe", middleware=[ ToolErrorResultMiddleware(transform_errors=True), - SafetyMiddleware(max_tier=TAG_DESTRUCTIVE), + SafetyMiddleware(max_tier=server_tier), ], ) register_batch_tools(mcp) @@ -137,14 +106,14 @@ def destructive_probe(value: str) -> dict[str, str]: return mcp -def test_call_readonly_tools_batch_preserves_structured_results() -> None: - """The readonly batch wrapper returns per-tool structured content.""" +def test_call_tools_batch_preserves_structured_results() -> None: + """The batch tool returns per-tool structured content.""" from fastmcp import Client async def _call() -> t.Any: async with Client(_batch_probe_server()) as client: return await client.call_tool( - "call_readonly_tools_batch", + "call_tools_batch", { "operations": [ { @@ -196,7 +165,7 @@ async def _call() -> t.Any: BATCH_RESPONSE_LIMIT_FIXTURES, ids=[fixture.test_id for fixture in BATCH_RESPONSE_LIMIT_FIXTURES], ) -def test_call_readonly_tools_batch_caps_aggregate_response( +def test_call_tools_batch_caps_aggregate_response( test_id: str, payload_size: int, ) -> None: @@ -211,7 +180,7 @@ def test_call_readonly_tools_batch_caps_aggregate_response( async def _call() -> t.Any: async with Client(_batch_probe_server()) as client: return await client.call_tool( - "call_readonly_tools_batch", + "call_tools_batch", { "operations": [ { @@ -271,11 +240,11 @@ async def _call() -> t.Any: BATCH_OPERATION_LIMIT_FIXTURES, ids=[fixture.test_id for fixture in BATCH_OPERATION_LIMIT_FIXTURES], ) -def test_call_readonly_tools_batch_rejects_oversized_operation_count( +def test_call_tools_batch_rejects_oversized_operation_count( test_id: str, operation_count: int, ) -> None: - """The batch wrapper rejects requests whose rows alone can exceed the cap.""" + """The batch tool rejects requests whose rows alone can exceed the cap.""" from fastmcp import Client from libtmux_mcp.middleware import DEFAULT_RESPONSE_LIMIT_BYTES @@ -285,7 +254,7 @@ def test_call_readonly_tools_batch_rejects_oversized_operation_count( async def _call() -> t.Any: async with Client(_batch_probe_server()) as client: return await client.call_tool( - "call_readonly_tools_batch", + "call_tools_batch", { "operations": [ { @@ -312,15 +281,16 @@ async def _call() -> t.Any: assert "operations must contain at most" in serialized -def test_call_readonly_tools_batch_rejects_mutating_inner_tool() -> None: - """Readonly batching does not tunnel a mutating tool call.""" +def test_call_tools_batch_max_tier_readonly_rejects_mutating_inner_tool() -> None: + """max_tier="readonly" refuses a mutating nested tool below the server tier.""" from fastmcp import Client async def _call() -> t.Any: async with Client(_batch_probe_server()) as client: return await client.call_tool( - "call_readonly_tools_batch", + "call_tools_batch", { + "max_tier": "readonly", "operations": [ { "tool": "mutating_probe", @@ -342,15 +312,16 @@ async def _call() -> t.Any: assert "exceeds batch tier readonly" in operation["error"] -def test_call_mutating_tools_batch_rejects_destructive_inner_tool() -> None: - """Mutating batching does not tunnel a destructive tool call.""" +def test_call_tools_batch_max_tier_mutating_rejects_destructive_inner_tool() -> None: + """max_tier="mutating" refuses a destructive nested tool below the server tier.""" from fastmcp import Client async def _call() -> t.Any: async with Client(_batch_probe_server()) as client: return await client.call_tool( - "call_mutating_tools_batch", + "call_tools_batch", { + "max_tier": "mutating", "operations": [ { "tool": "destructive_probe", @@ -369,14 +340,45 @@ async def _call() -> t.Any: assert "exceeds batch tier mutating" in operation["error"] -def test_call_mutating_tools_batch_continues_after_error() -> None: +def test_call_tools_batch_bounded_by_server_tier() -> None: + """A readonly-tier server blocks a mutating nested tool even with no max_tier. + + The batch tool is registered readonly so it stays callable at every tier, + but each nested call re-runs the safety middleware, so a readonly server + still refuses a mutating nested tool the batch did not cap itself. + """ + from fastmcp import Client + + async def _call() -> t.Any: + async with Client(_batch_probe_server(server_tier=TAG_READONLY)) as client: + return await client.call_tool( + "call_tools_batch", + { + "operations": [ + { + "tool": "mutating_probe", + "arguments": {"value": "changed"}, + } + ], + }, + raise_on_error=False, + ) + + result = asyncio.run(_call()) + + assert result.is_error is False + [operation] = result.structured_content["results"] + assert operation["success"] is False + + +def test_call_tools_batch_continues_after_error() -> None: """Continue mode attempts later operations after a failed tool call.""" from fastmcp import Client async def _call() -> t.Any: async with Client(_batch_probe_server()) as client: return await client.call_tool( - "call_mutating_tools_batch", + "call_tools_batch", { "on_error": "continue", "operations": [ @@ -406,17 +408,17 @@ async def _call() -> t.Any: def test_call_tools_batch_rejects_self_invocation() -> None: - """Batch wrappers cannot recursively call batch wrappers.""" + """The batch tool cannot recursively call the batch tool.""" from fastmcp import Client async def _call() -> t.Any: async with Client(_batch_probe_server()) as client: return await client.call_tool( - "call_destructive_tools_batch", + "call_tools_batch", { "operations": [ { - "tool": "call_destructive_tools_batch", + "tool": "call_tools_batch", "arguments": {"operations": []}, } ], @@ -432,26 +434,14 @@ async def _call() -> t.Any: assert "cannot call batch tools recursively" in operation["error"] -@pytest.mark.parametrize( - BatchAnnotationFixture._fields, - BATCH_ANNOTATION_FIXTURES, - ids=[fixture.test_id for fixture in BATCH_ANNOTATION_FIXTURES], -) -def test_batch_wrappers_advertise_worst_case_annotations( - test_id: str, - tool_name: str, - read_only_hint: bool, - destructive_hint: bool, - idempotent_hint: bool, - open_world_hint: bool, -) -> None: - """Batch wrappers advertise the strongest hint from their allowed tools.""" +def test_call_tools_batch_advertises_worst_case_annotations() -> None: + """The batch tool advertises possible side effects.""" mcp = _batch_probe_server() - tool = asyncio.run(mcp.get_tool(tool_name)) - assert tool is not None, f"{tool_name} should be registered" - assert tool.annotations is not None, f"{tool_name} should carry annotations" - assert tool.annotations.readOnlyHint is read_only_hint - assert tool.annotations.destructiveHint is destructive_hint - assert tool.annotations.idempotentHint is idempotent_hint - assert tool.annotations.openWorldHint is open_world_hint + tool = asyncio.run(mcp.get_tool("call_tools_batch")) + assert tool is not None + assert tool.annotations is not None + assert tool.annotations.readOnlyHint is False + assert tool.annotations.destructiveHint is True + assert tool.annotations.idempotentHint is False + assert tool.annotations.openWorldHint is True diff --git a/tests/test_chain_tools.py b/tests/test_chain_tools.py new file mode 100644 index 0000000..00c1db0 --- /dev/null +++ b/tests/test_chain_tools.py @@ -0,0 +1,830 @@ +"""Tests for typed tmux operation chains.""" + +from __future__ import annotations + +import asyncio +import time +import typing as t + +import pytest +from libtmux._experimental.chain import CommandScopeError +from pydantic import ValidationError + +from libtmux_mcp._utils import ExpectedToolError +from libtmux_mcp.models import ( + CapturePaneOperation, + CapturePaneStepResult, + KillPaneOperation, + MakeGridOperation, + PaneIdTarget, + RefTarget, + RunTmuxPlanResult, + SetOptionOperation, + SplitEvenlyOperation, + SplitPaneOperation, + SplitPaneStepResult, + TmuxOperation, + TmuxOperationStatus, + TmuxSendKeysOperation, +) +from libtmux_mcp.tools import chain_tools +from libtmux_mcp.tools.chain_tools import ( + TMUX_OPERATIONS_ADAPTER, + run_tmux_plan, +) + +if t.TYPE_CHECKING: + import pathlib + + from libtmux.pane import Pane + from libtmux.server import Server + from libtmux.session import Session + + +def _pane_target(pane: Pane) -> PaneIdTarget: + """Return a typed pane-id target for a fixture pane.""" + assert pane.pane_id is not None + return PaneIdTarget(pane_id=pane.pane_id) + + +def test_run_tmux_operations_runs_each_operation( + mcp_session: Session, +) -> None: + """Each operation runs and reports its own typed status.""" + server = mcp_session.server + result = asyncio.run( + run_tmux_plan( + operations=[ + SetOptionOperation(option="@cc_ops_a", value="1", global_=True), + SetOptionOperation(option="@cc_ops_b", value="2", global_=True), + ], + socket_name=server.socket_name, + ), + ) + + assert result.succeeded + assert [step.status for step in result.steps] == [ + TmuxOperationStatus.SUCCEEDED, + TmuxOperationStatus.SUCCEEDED, + ] + assert result.diagnostics is None + assert server.cmd("show-option", "-gv", "@cc_ops_a").stdout == ["1"] + assert server.cmd("show-option", "-gv", "@cc_ops_b").stdout == ["2"] + + +def test_run_tmux_operations_explain_attaches_diagnostics( + mcp_session: Session, +) -> None: + """``explain`` attaches one per-operation dispatch record.""" + server = mcp_session.server + result = asyncio.run( + run_tmux_plan( + operations=[ + SetOptionOperation(option="@cc_ops_x", value="1", global_=True), + SetOptionOperation(option="@cc_ops_y", value="2", global_=True), + ], + explain=True, + socket_name=server.socket_name, + ), + ) + + assert result.succeeded + assert result.diagnostics is not None + assert result.diagnostics.dispatch_count == 2 + assert [dispatch.index for dispatch in result.diagnostics.dispatches] == [0, 1] + assert all( + dispatch.argv[0] == "set-option" for dispatch in result.diagnostics.dispatches + ) + + +def test_run_tmux_operations_capture_returns_lines( + mcp_server: Server, + mcp_pane: Pane, +) -> None: + """A read operation returns its own captured lines on its own step.""" + from libtmux_mcp.tools.wait_for_tools import wait_for_channel + + channel = "cc_ops_capture" + mcp_pane.send_keys(f"printf 'CC_OPS_CAPTURE\\n'; tmux wait-for -S {channel}") + asyncio.run( + wait_for_channel(channel, timeout=5.0, socket_name=mcp_server.socket_name) + ) + + result = asyncio.run( + run_tmux_plan( + operations=[ + SetOptionOperation( + option="@cc_ops_before_capture", + value="1", + global_=True, + ), + CapturePaneOperation(target=_pane_target(mcp_pane)), + ], + socket_name=mcp_server.socket_name, + ), + ) + + assert result.succeeded + capture = result.steps[1] + assert isinstance(capture, CapturePaneStepResult) + assert capture.lines is not None + assert "CC_OPS_CAPTURE" in "\n".join(capture.lines) + + +def test_run_tmux_operations_captures_split_refs( + mcp_server: Server, + mcp_pane: Pane, +) -> None: + """A typed split ref can target later operations without raw commands.""" + from libtmux_mcp.tools.wait_for_tools import wait_for_channel + + channel = "cc_ops_split_ref" + keys = f"printf 'CC_OPS_REF\\n'; tmux wait-for -S {channel}" + result = asyncio.run( + run_tmux_plan( + operations=[ + SplitPaneOperation( + ref="child", + target=_pane_target(mcp_pane), + ), + TmuxSendKeysOperation(target=RefTarget(ref="child"), keys=keys), + ], + socket_name=mcp_server.socket_name, + ), + ) + + assert result.succeeded + split = result.steps[0] + assert isinstance(split, SplitPaneStepResult) + new_pane_id = result.created_panes["child"] + assert new_pane_id.startswith("%") + assert split.pane_id == new_pane_id + + asyncio.run( + wait_for_channel(channel, timeout=5.0, socket_name=mcp_server.socket_name) + ) + mcp_pane.window.refresh() + new_pane = mcp_pane.window.panes.get(pane_id=new_pane_id) + assert new_pane is not None + assert "CC_OPS_REF" in "\n".join(new_pane.capture_pane()) + + +def test_run_tmux_plan_split_evenly( + mcp_server: Server, + mcp_pane: Pane, +) -> None: + """split_evenly creates an even row/column of the requested pane count.""" + result = asyncio.run( + run_tmux_plan( + operations=[ + SplitEvenlyOperation( + target=_pane_target(mcp_pane), + count=3, + axis="horizontal", + ), + ], + socket_name=mcp_server.socket_name, + ), + ) + + assert result.succeeded + assert result.steps[0].status == TmuxOperationStatus.SUCCEEDED + mcp_pane.window.refresh() + assert len(mcp_pane.window.panes) == 3 + + +def test_run_tmux_plan_make_grid( + mcp_server: Server, + mcp_pane: Pane, +) -> None: + """make_grid tiles a pane's window into rows * cols panes.""" + result = asyncio.run( + run_tmux_plan( + operations=[ + MakeGridOperation(target=_pane_target(mcp_pane), rows=2, cols=2), + ], + socket_name=mcp_server.socket_name, + ), + ) + + assert result.succeeded + mcp_pane.window.refresh() + assert len(mcp_pane.window.panes) == 4 + + +def test_run_tmux_plan_send_keys_suppress_history( + mcp_session: Session, +) -> None: + """suppress_history space-prefixes the sent keys.""" + result = asyncio.run( + run_tmux_plan( + operations=[ + TmuxSendKeysOperation( + target=PaneIdTarget(pane_id="%999999"), + keys="secret", + enter=False, + suppress_history=True, + ), + ], + on_error="continue", + explain=True, + socket_name=mcp_session.server.socket_name, + ), + ) + + assert result.diagnostics is not None + assert " secret" in result.diagnostics.dispatches[0].argv + + +def test_run_tmux_plan_kill_pane_requires_destructive_tier( + mcp_session: Session, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """kill_pane fails closed unless the server runs at the destructive tier.""" + monkeypatch.setenv("LIBTMUX_SAFETY", "mutating") + result = asyncio.run( + run_tmux_plan( + operations=[KillPaneOperation(target=PaneIdTarget(pane_id="%999999"))], + explain=True, + socket_name=mcp_session.server.socket_name, + ), + ) + + assert not result.succeeded + assert result.diagnostics is not None + assert result.diagnostics.dispatch_count == 0 + assert result.steps[0].status == TmuxOperationStatus.FAILED + assert result.steps[0].error == "kill_pane requires the destructive safety tier" + + +def test_run_tmux_plan_kill_pane_at_destructive_tier( + mcp_server: Server, + mcp_pane: Pane, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """kill_pane removes a pane when the server runs at the destructive tier.""" + monkeypatch.setenv("LIBTMUX_SAFETY", "destructive") + result = asyncio.run( + run_tmux_plan( + operations=[ + SplitPaneOperation(ref="child", target=_pane_target(mcp_pane)), + KillPaneOperation(target=RefTarget(ref="child")), + ], + socket_name=mcp_server.socket_name, + ), + ) + + assert result.succeeded + new_pane_id = result.created_panes["child"] + mcp_pane.window.refresh() + pane_ids = [pane.pane_id for pane in mcp_pane.window.panes] + assert new_pane_id not in pane_ids + + +def test_run_tmux_operations_continue_runs_later_ops( + mcp_session: Session, +) -> None: + """Continue mode records each failure and runs the rest.""" + server = mcp_session.server + result = asyncio.run( + run_tmux_plan( + operations=[ + TmuxSendKeysOperation( + target=PaneIdTarget(pane_id="%999999"), + keys="bad", + enter=False, + ), + SetOptionOperation( + option="@cc_ops_after_error", + value="set", + global_=True, + ), + ], + on_error="continue", + socket_name=server.socket_name, + ), + ) + + assert not result.succeeded + assert [step.status for step in result.steps] == [ + TmuxOperationStatus.FAILED, + TmuxOperationStatus.SUCCEEDED, + ] + assert server.cmd("show-option", "-gv", "@cc_ops_after_error").stdout == ["set"] + + +def test_run_tmux_operations_stop_halts_after_failure( + mcp_session: Session, +) -> None: + """Stop mode (the default) skips every operation after the first failure.""" + server = mcp_session.server + result = asyncio.run( + run_tmux_plan( + operations=[ + SetOptionOperation(option="@cc_ops_cm_a", value="1", global_=True), + TmuxSendKeysOperation( + target=PaneIdTarget(pane_id="%999999"), + keys="bad", + enter=False, + ), + SetOptionOperation(option="@cc_ops_cm_b", value="2", global_=True), + ], + socket_name=server.socket_name, + ), + ) + + assert not result.succeeded + assert [step.status for step in result.steps] == [ + TmuxOperationStatus.SUCCEEDED, + TmuxOperationStatus.FAILED, + TmuxOperationStatus.SKIPPED, + ] + assert result.steps[1].error is not None + assert "%999999" in result.steps[1].error + # The first op ran; the op after the failure never dispatched. + assert server.cmd("show-option", "-gv", "@cc_ops_cm_a").stdout == ["1"] + assert server.cmd("show-option", "-gv", "@cc_ops_cm_b").stdout == [] + + +def test_run_tmux_operations_split_inherits_target_directory( + mcp_session: Session, + tmp_path: pathlib.Path, +) -> None: + """A split's new pane inherits the split target's working directory.""" + server = mcp_session.server + target_dir = str(tmp_path) + created = server.cmd( + "new-window", + "-t", + mcp_session.session_id, + "-P", + "-F", + "#{pane_id}", + "-c", + target_dir, + ) + target_pane_id = created.stdout[0] + target_cwd = server.cmd( + "display-message", + "-t", + target_pane_id, + "-p", + "#{pane_current_path}", + ).stdout + + result = asyncio.run( + run_tmux_plan( + operations=[ + SplitPaneOperation( + ref="child", + target=PaneIdTarget(pane_id=target_pane_id), + ), + ], + socket_name=server.socket_name, + ), + ) + + assert result.succeeded + new_pane_id = result.created_panes["child"] + new_cwd = server.cmd( + "display-message", + "-t", + new_pane_id, + "-p", + "#{pane_current_path}", + ).stdout + assert new_cwd == target_cwd + + +def test_run_tmux_operations_surfaces_libtmux_scope_error( + mcp_session: Session, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The compiler reports a libtmux scope-contract failure as a step failure. + + The contract metadata is static, so this uses monkeypatch instead of a + tmux fixture to simulate libtmux rejecting a command's target scope. + """ + + def fail_scope(command_name: str, target_scope: str) -> None: + msg = f"{command_name} {target_scope} wrong scope from test" + raise CommandScopeError(msg) + + monkeypatch.setattr( + chain_tools, + "validate_command_scope", + fail_scope, + raising=False, + ) + + result = asyncio.run( + run_tmux_plan( + operations=[ + SetOptionOperation( + option="@cc_ops_contract_error", + value="set", + global_=True, + ), + ], + explain=True, + socket_name=mcp_session.server.socket_name, + ), + ) + + assert not result.succeeded + assert result.diagnostics is not None + assert result.diagnostics.dispatch_count == 0 + assert result.steps[0].status == TmuxOperationStatus.FAILED + assert result.steps[0].error is not None + assert "wrong scope from test" in result.steps[0].error + + +def test_run_tmux_operations_dry_run_plans_without_mutating( + mcp_session: Session, +) -> None: + """Dry-run returns planned steps without changing tmux state.""" + server = mcp_session.server + result = asyncio.run( + run_tmux_plan( + operations=[ + SetOptionOperation(option="@cc_ops_dry_a", value="1", global_=True), + SetOptionOperation(option="@cc_ops_dry_b", value="2", global_=True), + ], + dry_run=True, + explain=True, + socket_name=server.socket_name, + ), + ) + + assert result.succeeded + assert result.dry_run + assert result.diagnostics is not None + assert result.diagnostics.dispatch_count == 2 + assert all( + dispatch.returncode is None for dispatch in result.diagnostics.dispatches + ) + assert [step.status for step in result.steps] == [ + TmuxOperationStatus.PLANNED, + TmuxOperationStatus.PLANNED, + ] + for option in ("@cc_ops_dry_a", "@cc_ops_dry_b"): + assert server.cmd("show-option", "-gv", option).stdout == [] + + +def test_run_tmux_operations_dry_run_plans_split_ref( + mcp_server: Server, + mcp_pane: Pane, +) -> None: + """Dry-run uses placeholders for pane refs without creating panes.""" + mcp_pane.window.refresh() + pane_count = len(mcp_pane.window.panes) + + result = asyncio.run( + run_tmux_plan( + operations=[ + SplitPaneOperation( + ref="child", + target=_pane_target(mcp_pane), + ), + TmuxSendKeysOperation( + target=RefTarget(ref="child"), + keys="printf 'DRY_RUN_REF\\n'", + ), + ], + dry_run=True, + socket_name=mcp_server.socket_name, + ), + ) + + placeholder = "" + assert result.succeeded + assert result.dry_run + assert result.created_panes == {"child": placeholder} + split = result.steps[0] + assert isinstance(split, SplitPaneStepResult) + assert split.status == TmuxOperationStatus.PLANNED + assert split.pane_id == placeholder + assert result.steps[1].status == TmuxOperationStatus.PLANNED + + mcp_pane.window.refresh() + assert len(mcp_pane.window.panes) == pane_count + + +def test_run_tmux_operations_dry_run_plans_output_ops( + mcp_server: Server, + mcp_pane: Pane, +) -> None: + """Dry-run plans read operations as planned steps.""" + result = asyncio.run( + run_tmux_plan( + operations=[ + SetOptionOperation( + option="@cc_ops_dry_pending", + value="1", + global_=True, + ), + CapturePaneOperation(target=_pane_target(mcp_pane)), + ], + dry_run=True, + socket_name=mcp_server.socket_name, + ), + ) + + assert result.succeeded + assert [step.status for step in result.steps] == [ + TmuxOperationStatus.PLANNED, + TmuxOperationStatus.PLANNED, + ] + + +def test_run_tmux_operations_dispatch_timeout( + mcp_server: Server, + mcp_pane: Pane, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A dispatch timeout returns a failed per-operation result. + + The dispatch helper is a synchronous wrapper around tmux, so this uses + monkeypatch rather than a blocking tmux command. + """ + + def sleep_dispatch(*args: object, **kwargs: object) -> t.NoReturn: + time.sleep(0.05) + msg = "dispatch should have timed out" + raise AssertionError(msg) + + monkeypatch.setattr(chain_tools, "_dispatch_standalone", sleep_dispatch) + assert mcp_pane.pane_id is not None + + result = asyncio.run( + run_tmux_plan( + operations=[CapturePaneOperation(target=_pane_target(mcp_pane))], + dispatch_timeout=0.001, + explain=True, + socket_name=mcp_server.socket_name, + ), + ) + + assert not result.succeeded + assert result.diagnostics is not None + assert result.diagnostics.dispatch_count == 1 + assert result.diagnostics.dispatches[0].index == 0 + assert result.diagnostics.dispatches[0].returncode is None + assert result.diagnostics.dispatches[0].stderr == [ + "tmux dispatch timed out after 0.001 seconds", + ] + assert result.steps[0].status == TmuxOperationStatus.FAILED + assert result.steps[0].error == "tmux dispatch timed out after 0.001 seconds" + + +class TimeoutValidationCase(t.NamedTuple): + """Case for timeout input validation.""" + + test_id: str + dispatch_timeout: float + + +@pytest.mark.parametrize( + "case", + [ + TimeoutValidationCase(test_id="zero", dispatch_timeout=0.0), + TimeoutValidationCase(test_id="negative", dispatch_timeout=-1.0), + ], + ids=lambda case: case.test_id, +) +def test_run_tmux_operations_dispatch_timeout_validation( + case: TimeoutValidationCase, + mcp_session: Session, +) -> None: + """Dispatch timeout must be positive when set.""" + with pytest.raises(ExpectedToolError, match="dispatch_timeout"): + asyncio.run( + run_tmux_plan( + operations=[ + SetOptionOperation( + option="@cc_ops_timeout_validation", + value="1", + global_=True, + ), + ], + dispatch_timeout=case.dispatch_timeout, + socket_name=mcp_session.server.socket_name, + ), + ) + + +class CompileErrorPathCase(t.NamedTuple): + """Case for branch-local compile error paths.""" + + test_id: str + operations: list[TmuxOperation] + expected_statuses: list[TmuxOperationStatus] + expected_error: str | None + + +@pytest.mark.parametrize( + "case", + [ + CompileErrorPathCase( + test_id="unknown_ref", + operations=[ + TmuxSendKeysOperation( + target=RefTarget(ref="missing"), + keys="bad", + enter=False, + ), + ], + expected_statuses=[TmuxOperationStatus.FAILED], + expected_error="unknown ref: missing", + ), + CompileErrorPathCase( + test_id="failure_before_compile_error", + operations=[ + TmuxSendKeysOperation( + target=PaneIdTarget(pane_id="%999999"), + keys="bad", + enter=False, + ), + TmuxSendKeysOperation( + target=RefTarget(ref="missing"), + keys="bad", + enter=False, + ), + ], + expected_statuses=[ + TmuxOperationStatus.FAILED, + TmuxOperationStatus.SKIPPED, + ], + expected_error=None, + ), + ], + ids=lambda case: case.test_id, +) +def test_run_tmux_operations_compile_error_paths( + case: CompileErrorPathCase, + mcp_session: Session, +) -> None: + """Compile errors report directly; stop mode skips operations after them.""" + result = asyncio.run( + run_tmux_plan( + operations=case.operations, + socket_name=mcp_session.server.socket_name, + ), + ) + + assert not result.succeeded + assert [step.status for step in result.steps] == case.expected_statuses + if case.expected_error is not None: + assert result.steps[0].error == case.expected_error + + +def test_run_tmux_operations_split_failure_skips_later_ops( + mcp_session: Session, +) -> None: + """A failed split skips every later operation under stop mode.""" + server = mcp_session.server + result = asyncio.run( + run_tmux_plan( + operations=[ + SplitPaneOperation( + ref="child", + target=PaneIdTarget(pane_id="%999999"), + ), + TmuxSendKeysOperation( + target=RefTarget(ref="child"), + keys="bad", + enter=False, + ), + SetOptionOperation( + option="@cc_ops_after_split_failure", + value="set", + global_=True, + ), + ], + socket_name=server.socket_name, + ), + ) + + assert not result.succeeded + assert [step.status for step in result.steps] == [ + TmuxOperationStatus.FAILED, + TmuxOperationStatus.SKIPPED, + TmuxOperationStatus.SKIPPED, + ] + assert server.cmd("show-option", "-gv", "@cc_ops_after_split_failure").stdout == [] + + +class RollbackCase(t.NamedTuple): + """Case for rollback of created panes.""" + + test_id: str + rollback_on_error: bool + expect_rollback: bool + + +@pytest.mark.parametrize( + "case", + [ + RollbackCase( + test_id="enabled", + rollback_on_error=True, + expect_rollback=True, + ), + RollbackCase( + test_id="disabled", + rollback_on_error=False, + expect_rollback=False, + ), + ], + ids=lambda case: case.test_id, +) +def test_run_tmux_operations_rolls_back_created_panes( + case: RollbackCase, + mcp_server: Server, + mcp_pane: Pane, +) -> None: + """Rollback kills panes created before a later operation fails.""" + result: RunTmuxPlanResult | None = None + try: + result = asyncio.run( + run_tmux_plan( + operations=[ + SplitPaneOperation( + ref="child", + target=_pane_target(mcp_pane), + ), + TmuxSendKeysOperation( + target=PaneIdTarget(pane_id="%999999"), + keys="bad", + enter=False, + ), + ], + rollback_on_error=case.rollback_on_error, + socket_name=mcp_server.socket_name, + ), + ) + + assert not result.succeeded + new_pane_id = result.created_panes["child"] + assert result.rollback_errors == [] + assert result.rolled_back_panes == ( + [new_pane_id] if case.expect_rollback else [] + ) + mcp_pane.window.refresh() + pane_ids = [pane.pane_id for pane in mcp_pane.window.panes] + assert (new_pane_id not in pane_ids) is case.expect_rollback + finally: + if result is not None and not case.expect_rollback: + pane_id = result.created_panes.get("child") + if pane_id is not None: + mcp_server.cmd("kill-pane", "-t", pane_id) + + +class ValidationCase(t.NamedTuple): + """Case for typed operation validation failures.""" + + test_id: str + operations: object + expected_error: type[Exception] + + +@pytest.mark.parametrize( + "case", + [ + ValidationCase( + test_id="empty_operations", + operations=[], + expected_error=ExpectedToolError, + ), + ValidationCase( + test_id="unknown_raw_kind", + operations=[{"kind": "kill_server"}], + expected_error=ValidationError, + ), + ValidationCase( + test_id="unknown_target_kind", + operations=[ + {"kind": "send_keys", "keys": "x", "target": {"kind": "bogus"}} + ], + expected_error=ValidationError, + ), + ], + ids=lambda case: case.test_id, +) +def test_run_tmux_operations_validation( + case: ValidationCase, + mcp_session: Session, +) -> None: + """The tool accepts only non-empty typed operation variants.""" + if case.expected_error is ValidationError: + with pytest.raises(case.expected_error): + TMUX_OPERATIONS_ADAPTER.validate_python(case.operations) + return + + with pytest.raises(case.expected_error): + asyncio.run( + run_tmux_plan( + operations=t.cast("list[TmuxOperation]", case.operations), + socket_name=mcp_session.server.socket_name, + ), + ) diff --git a/tests/test_middleware.py b/tests/test_middleware.py index 698db81..2011d48 100644 --- a/tests/test_middleware.py +++ b/tests/test_middleware.py @@ -1140,14 +1140,14 @@ async def _call() -> None: with caplog.at_level(logging.DEBUG, logger="fastmcp.errors"): asyncio.run(_call()) - levels = [ + levels = { r.levelno for r in caplog.records if r.name == "fastmcp.errors" and "Error in tools/call" in r.getMessage() and message_fragment in r.getMessage() - ] - assert levels == [expected_level] + } + assert levels == {expected_level} def test_schema_validation_failure_marked_expected_in_meta() -> None: diff --git a/tests/test_server.py b/tests/test_server.py index 023e406..ecfcdf9 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -162,7 +162,7 @@ def test_resolve_safety_level( expected_level: str, ) -> None: """Safety env values resolve to the server's effective tier.""" - from libtmux_mcp.server import _resolve_safety_level + from libtmux_mcp._utils import _resolve_safety_level assert test_id assert _resolve_safety_level(env_value) == expected_level diff --git a/uv.lock b/uv.lock index a3eb04f..40a7f82 100644 --- a/uv.lock +++ b/uv.lock @@ -1179,11 +1179,7 @@ wheels = [ [[package]] name = "libtmux" version = "0.58.1" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/c7/58/346776e0491ede33e1554a4bff9b545dbe9f3164e45abac483195938a1cf/libtmux-0.58.1.tar.gz", hash = "sha256:a294dd585aa419d4ecce36f3e55df656693743c97a0b5b5bb1e5fea31ada2482", size = 519541, upload-time = "2026-06-17T00:03:31.81Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/22/4d/e44ada32edfe947c40d4dfc596a6f5355400a16d08be06016bd754375e41/libtmux-0.58.1-py3-none-any.whl", hash = "sha256:ab0f47d03a59d674962bc23e36e188fcfa4a82b0f270d474afab519e3076839b", size = 113653, upload-time = "2026-06-17T00:03:30.48Z" }, -] +source = { git = "https://github.com/tmux-python/libtmux.git?rev=05f55e2a05fbc746924d9fbacbaf82766f9c0315#05f55e2a05fbc746924d9fbacbaf82766f9c0315" } [[package]] name = "libtmux-mcp" @@ -1246,7 +1242,7 @@ testing = [ [package.metadata] requires-dist = [ { name = "fastmcp", specifier = ">=3.4.2,<4.0.0" }, - { name = "libtmux", specifier = ">=0.58.0,<1.0" }, + { name = "libtmux", git = "https://github.com/tmux-python/libtmux.git?rev=05f55e2a05fbc746924d9fbacbaf82766f9c0315" }, ] [package.metadata.requires-dev]