Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/content/docs/python-client.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,36 @@ result = await client.query(
)
```

## Run Commands Without the Agent

Use `execute_commands()` when you only need to run shell commands in the sandbox -- no agent invocation, no prompt. The method returns per-command exit codes and raises `AgentRuntimeError` if any command fails.

```python
from runtimeuse_client import (
CommandInterface,
ExecuteCommandsOptions,
RuntimeUseClient,
)

client = RuntimeUseClient(ws_url="ws://localhost:8080")

result = await client.execute_commands(
commands=[
CommandInterface(command="mkdir -p /app/output"),
CommandInterface(command="echo 'sandbox is ready' > /app/output/status.txt"),
CommandInterface(command="cat /app/output/status.txt"),
],
options=ExecuteCommandsOptions(
on_assistant_message=on_assistant_message, # streams stdout/stderr
),
)

for item in result.results:
print(f"{item.command} -> exit {item.exit_code}")
```

`execute_commands()` supports the same callbacks and options as `query()`: streaming via `on_assistant_message`, artifact uploads, cancellation, timeout, and `secrets_to_redact`. Use `pre_execution_downloadables` to fetch files into the sandbox before the commands run.

## Cancel a Run

Call `client.abort()` from another coroutine to cancel an in-flight query. The client sends a cancel message to the runtime and `query()` raises `CancelledException`.
Expand Down
35 changes: 34 additions & 1 deletion packages/runtimeuse-client-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,36 @@ elif isinstance(result.data, StructuredOutputResult):
print(result.metadata) # execution metadata
```

### Command-Only Execution

Use `execute_commands()` when you need to run shell commands in the sandbox without invoking the agent. This is useful for setup steps, health checks, or any workflow where you only need command exit codes.

```python
from runtimeuse_client import (
CommandInterface,
ExecuteCommandsOptions,
RuntimeUseClient,
)

client = RuntimeUseClient(ws_url="ws://localhost:8080")

result = await client.execute_commands(
commands=[
CommandInterface(command="mkdir -p /app/output"),
CommandInterface(command="echo 'sandbox is ready' > /app/output/status.txt"),
CommandInterface(command="cat /app/output/status.txt"),
],
options=ExecuteCommandsOptions(
on_assistant_message=on_assistant, # optional -- streams stdout/stderr
),
)

for item in result.results:
print(f"{item.command} -> exit code {item.exit_code}")
```

`execute_commands()` supports the same streaming, cancellation, timeout, secret redaction, artifact upload, and error semantics as `query()`. If any command exits non-zero, `AgentRuntimeError` is raised.

### Artifact Upload Handshake

When the agent runtime requests an artifact upload, provide a callback that returns a presigned URL and content type. The client sends the response back automatically.
Expand Down Expand Up @@ -184,7 +214,10 @@ except CancelledException:
| `ArtifactUploadRequestMessageInterface` | Runtime requesting a presigned URL for artifact upload |
| `ArtifactUploadResponseMessageInterface` | Response with presigned URL sent back to runtime |
| `ErrorMessageInterface` | Error from the agent runtime |
| `CommandInterface` | Pre/post invocation shell command |
| `ExecuteCommandsOptions` | Configuration for `client.execute_commands()` (callbacks, timeout) |
| `CommandExecutionResult` | Return type of `execute_commands()` (`.results`) |
| `CommandResultItem` | Per-command result (`.command`, `.exit_code`) |
| `CommandInterface` | Shell command to execute (`.command`, `.cwd`) |
| `RuntimeEnvironmentDownloadableInterface` | File to download into the runtime before invocation |

### Exceptions
Expand Down
2 changes: 1 addition & 1 deletion packages/runtimeuse-client-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "runtimeuse-client"
version = "0.7.0"
version = "0.8.0"
description = "Client library for AI agent runtime communication over WebSocket"
readme = "README.md"
license = {"text" = "FSL"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
RuntimeEnvironmentDownloadableInterface,
CommandInterface,
InvocationMessage,
CommandExecutionMessage,
QueryOptions,
ExecuteCommandsOptions,
QueryResult,
CommandResultItem,
CommandExecutionResult,
ResultMessageInterface,
CommandExecutionResultMessageInterface,
TextResult,
StructuredOutputResult,
AssistantMessageInterface,
Expand All @@ -31,9 +36,14 @@
"RuntimeEnvironmentDownloadableInterface",
"CommandInterface",
"InvocationMessage",
"CommandExecutionMessage",
"QueryOptions",
"ExecuteCommandsOptions",
"QueryResult",
"CommandResultItem",
"CommandExecutionResult",
"ResultMessageInterface",
"CommandExecutionResultMessageInterface",
"TextResult",
"StructuredOutputResult",
"AssistantMessageInterface",
Expand Down
146 changes: 146 additions & 0 deletions packages/runtimeuse-client-python/src/runtimeuse_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
from .transports import Transport, WebSocketTransport
from .types import (
InvocationMessage,
CommandExecutionMessage,
AgentRuntimeMessageInterface,
CancelMessage,
ErrorMessageInterface,
ResultMessageInterface,
CommandExecutionResultMessageInterface,
QueryResult,
CommandExecutionResult,
CommandInterface,
AssistantMessageInterface,
ArtifactUploadRequestMessageInterface,
ArtifactUploadResponseMessageInterface,
QueryOptions,
ExecuteCommandsOptions,
)
from .exceptions import AgentRuntimeError, CancelledException

Expand Down Expand Up @@ -198,3 +203,144 @@ async def query(
raise AgentRuntimeError("No result message received")

return QueryResult(data=wire_result.data, metadata=wire_result.metadata)

async def execute_commands(
self,
commands: list[CommandInterface],
options: ExecuteCommandsOptions,
) -> CommandExecutionResult:
"""Execute commands in the runtime without invoking the agent.

Sends a :class:`CommandExecutionMessage`, processes the response
stream, and returns a :class:`CommandExecutionResult` with
per-command exit codes.

Args:
commands: Commands to execute in the runtime environment.
options: Execution configuration including secrets, callbacks,
artifacts, and timeout.

Raises:
AgentRuntimeError: If a command fails or the runtime sends an error.
CancelledException: If cancelled via :meth:`abort`.
TimeoutError: If the timeout is exceeded.
"""
logger = options.logger or _default_logger

self._abort_event = asyncio.Event()

message = CommandExecutionMessage(
message_type="command_execution_message",
source_id=options.source_id,
secrets_to_redact=options.secrets_to_redact,
commands=commands,
artifacts_dir=options.artifacts_dir,
pre_execution_downloadables=options.pre_execution_downloadables,
)

send_queue: asyncio.Queue[dict] = asyncio.Queue()
await send_queue.put(message.model_dump(mode="json"))

wire_result: CommandExecutionResultMessageInterface | None = None

async with asyncio.timeout(options.timeout):
async for msg in self._transport(send_queue=send_queue):
if self._abort_event.is_set():
logger.info("Command execution cancelled by caller")
await send_queue.put(
CancelMessage(message_type="cancel_message").model_dump(
mode="json"
)
)
await send_queue.join()
raise CancelledException("Command execution was cancelled")

try:
message_interface = AgentRuntimeMessageInterface.model_validate(
msg
)
except pydantic.ValidationError:
logger.error(
f"Received unknown message type from agent runtime: {msg}"
)
continue

if (
message_interface.message_type
== "command_execution_result_message"
):
wire_result = (
CommandExecutionResultMessageInterface.model_validate(msg)
)
logger.info(
f"Received command execution result from agent runtime: {msg}"
)
continue

elif message_interface.message_type == "assistant_message":
if options.on_assistant_message is not None:
assistant_message_interface = (
AssistantMessageInterface.model_validate(msg)
)
await options.on_assistant_message(
assistant_message_interface
)
continue

elif message_interface.message_type == "error_message":
try:
error_message_interface = (
ErrorMessageInterface.model_validate(msg)
)
except pydantic.ValidationError:
logger.error(
f"Received malformed error message from agent runtime: {msg}",
)
raise AgentRuntimeError(str(msg))
logger.error(
f"Error from agent runtime: {error_message_interface}",
)
raise AgentRuntimeError(
error_message_interface.error,
metadata=error_message_interface.metadata,
)

elif (
message_interface.message_type
== "artifact_upload_request_message"
):
logger.info(
f"Received artifact upload request message from agent runtime: {msg}"
)
if options.on_artifact_upload_request is not None:
artifact_upload_request_message_interface = (
ArtifactUploadRequestMessageInterface.model_validate(
msg
)
)
upload_result = await options.on_artifact_upload_request(
artifact_upload_request_message_interface
)
artifact_upload_response_message_interface = ArtifactUploadResponseMessageInterface(
message_type="artifact_upload_response_message",
filename=artifact_upload_request_message_interface.filename,
filepath=artifact_upload_request_message_interface.filepath,
presigned_url=upload_result.presigned_url,
content_type=upload_result.content_type,
)
await send_queue.put(
artifact_upload_response_message_interface.model_dump(
mode="json"
)
)
continue

else:
logger.info(
f"Received non-result message from agent runtime: {msg}"
)

if wire_result is None:
raise AgentRuntimeError("No result message received")

return CommandExecutionResult(results=wire_result.results)
58 changes: 58 additions & 0 deletions packages/runtimeuse-client-python/src/runtimeuse_client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class AgentRuntimeMessageInterface(BaseModel):
"assistant_message",
"artifact_upload_request_message",
"error_message",
"command_execution_result_message",
]


Expand Down Expand Up @@ -99,6 +100,33 @@ class CancelMessage(BaseModel):
message_type: Literal["cancel_message"]


class CommandExecutionMessage(BaseModel):
message_type: Literal["command_execution_message"]
source_id: str | None = None
secrets_to_redact: list[str] = Field(default_factory=list)
commands: list[CommandInterface]
artifacts_dir: str | None = None
pre_execution_downloadables: list[RuntimeEnvironmentDownloadableInterface] | None = None


class CommandResultItem(BaseModel):
command: str
exit_code: int


class CommandExecutionResult(BaseModel):
"""Result returned by :meth:`RuntimeUseClient.execute_commands`."""

results: list[CommandResultItem]


class CommandExecutionResultMessageInterface(AgentRuntimeMessageInterface):
"""Wire-format result message from command-only execution."""

message_type: Literal["command_execution_result_message"]
results: list[CommandResultItem]


class ArtifactUploadResult(BaseModel):
presigned_url: str
content_type: str
Expand Down Expand Up @@ -156,3 +184,33 @@ def __post_init__(self) -> None:
raise ValueError(
"artifacts_dir and on_artifact_upload_request must be specified together"
)


@dataclass
class ExecuteCommandsOptions:
"""Options for :meth:`RuntimeUseClient.execute_commands`."""

#: Secret values to redact from command output.
secrets_to_redact: list[str] = field(default_factory=list)
#: Caller-defined identifier for tracing/logging purposes.
source_id: str | None = None
#: Directory inside the runtime environment where artifacts are written.
artifacts_dir: str | None = None
#: Files to download into the runtime environment before commands run.
pre_execution_downloadables: list[RuntimeEnvironmentDownloadableInterface] | None = None
#: Called for each assistant (intermediate) message streamed back.
on_assistant_message: OnAssistantMessageCallback | None = None
#: Called when the runtime requests an artifact upload URL.
on_artifact_upload_request: OnArtifactUploadRequestCallback | None = None
#: Overall timeout in seconds. ``None`` means no limit.
timeout: float | None = None
#: Logger instance; falls back to the module-level logger when ``None``.
logger: logging.Logger | None = None

def __post_init__(self) -> None:
has_dir = self.artifacts_dir is not None
has_cb = self.on_artifact_upload_request is not None
if has_dir != has_cb:
raise ValueError(
"artifacts_dir and on_artifact_upload_request must be specified together"
)
12 changes: 11 additions & 1 deletion packages/runtimeuse-client-python/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import dotenv
import pytest

from src.runtimeuse_client import RuntimeUseClient, QueryOptions
from src.runtimeuse_client import RuntimeUseClient, QueryOptions, ExecuteCommandsOptions

dotenv.load_dotenv()

Expand Down Expand Up @@ -75,3 +75,13 @@ def query_options():
def make_query_options():
"""Return the _make_query_options factory for tests that need custom fields."""
return _make_query_options


def _make_execute_commands_options(**overrides: Any) -> ExecuteCommandsOptions:
return ExecuteCommandsOptions(**overrides)


@pytest.fixture
def make_execute_commands_options():
"""Return the _make_execute_commands_options factory for tests."""
return _make_execute_commands_options
Loading