Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions crates/openshell-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,48 @@ enum SandboxCommands {
all: bool,
},

/// Execute a command in a running sandbox.
///
/// Runs a command inside an existing sandbox using the gRPC exec endpoint.
/// Output is streamed to the terminal in real-time. The CLI exits with the
/// remote command's exit code.
///
/// For interactive shell sessions, use `sandbox connect` instead.
///
/// Examples:
/// openshell sandbox exec --name my-sandbox -- ls -la /workspace
/// openshell sandbox exec -n my-sandbox --workdir /app -- python script.py
/// echo "hello" | openshell sandbox exec -n my-sandbox -- cat
#[command(help_template = LEAF_HELP_TEMPLATE, next_help_heading = "FLAGS")]
Exec {
/// Sandbox name (defaults to last-used sandbox).
#[arg(long, short = 'n', add = ArgValueCompleter::new(completers::complete_sandbox_names))]
name: Option<String>,

/// Working directory inside the sandbox.
#[arg(long)]
workdir: Option<String>,

/// Timeout in seconds (0 = no timeout).
#[arg(long, default_value_t = 0)]
timeout: u32,

/// Allocate a pseudo-terminal for the remote command.
/// Defaults to auto-detection (on when stdin and stdout are terminals).
/// Use --tty to force a PTY even when auto-detection fails, or
/// --no-tty to disable.
#[arg(long, overrides_with = "no_tty")]
tty: bool,

/// Disable pseudo-terminal allocation.
#[arg(long, overrides_with = "tty")]
no_tty: bool,

/// Command and arguments to execute.
#[arg(required = true, trailing_var_arg = true, allow_hyphen_values = true)]
command: Vec<String>,
},

/// Connect to a sandbox.
///
/// When no name is given, reconnects to the last-used sandbox.
Expand Down Expand Up @@ -2307,6 +2349,38 @@ async fn main() -> Result<()> {
}
let _ = save_last_sandbox(&ctx.name, &name);
}
SandboxCommands::Exec {
name,
workdir,
timeout,
tty,
no_tty,
command,
} => {
let name = resolve_sandbox_name(name, &ctx.name)?;
// Resolve --tty / --no-tty into an Option<bool> override.
let tty_override = if no_tty {
Some(false)
} else if tty {
Some(true)
} else {
None // auto-detect
};
let exit_code = run::sandbox_exec_grpc(
endpoint,
&name,
&command,
workdir.as_deref(),
timeout,
tty_override,
&tls,
)
.await?;
let _ = save_last_sandbox(&ctx.name, &name);
if exit_code != 0 {
std::process::exit(exit_code);
}
}
SandboxCommands::SshConfig { name } => {
let name = resolve_sandbox_name(name, &ctx.name)?;
run::print_ssh_config(&ctx.name, &name);
Expand Down
116 changes: 113 additions & 3 deletions crates/openshell-cli/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ use openshell_bootstrap::{
use openshell_core::proto::{
ApproveAllDraftChunksRequest, ApproveDraftChunkRequest, ClearDraftChunksRequest,
CreateProviderRequest, CreateSandboxRequest, DeleteProviderRequest, DeleteSandboxRequest,
GetClusterInferenceRequest, GetDraftHistoryRequest, GetDraftPolicyRequest,
ExecSandboxRequest, GetClusterInferenceRequest, GetDraftHistoryRequest, GetDraftPolicyRequest,
GetGatewayConfigRequest, GetProviderRequest, GetSandboxConfigRequest, GetSandboxLogsRequest,
GetSandboxPolicyStatusRequest, GetSandboxRequest, HealthRequest, ListProvidersRequest,
ListSandboxPoliciesRequest, ListSandboxesRequest, PolicyStatus, Provider,
RejectDraftChunkRequest, Sandbox, SandboxPhase, SandboxPolicy, SandboxSpec, SandboxTemplate,
SetClusterInferenceRequest, SettingScope, SettingValue, UpdateConfigRequest,
UpdateProviderRequest, WatchSandboxRequest, setting_value,
UpdateProviderRequest, WatchSandboxRequest, exec_sandbox_event, setting_value,
};
use openshell_core::settings::{self, SettingValueKind};
use openshell_providers::{
ProviderRegistry, detect_provider_from_command, normalize_provider_type,
};
use owo_colors::OwoColorize;
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{IsTerminal, Write};
use std::io::{IsTerminal, Read, Write};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -2693,6 +2693,116 @@ pub async fn sandbox_get(server: &str, name: &str, tls: &TlsOptions) -> Result<(
Ok(())
}

/// Maximum stdin payload size (4 MiB). Prevents the CLI from reading unbounded
/// data into memory before the server rejects an oversized message.
const MAX_STDIN_PAYLOAD: usize = 4 * 1024 * 1024;

/// Execute a command in a running sandbox via gRPC, streaming output to the terminal.
///
/// Returns the remote command's exit code.
pub async fn sandbox_exec_grpc(
server: &str,
name: &str,
command: &[String],
workdir: Option<&str>,
timeout_seconds: u32,
tty_override: Option<bool>,
tls: &TlsOptions,
) -> Result<i32> {
let mut client = grpc_client(server, tls).await?;

// Resolve sandbox name to id.
let sandbox = client
.get_sandbox(GetSandboxRequest {
name: name.to_string(),
})
.await
.into_diagnostic()?
.into_inner()
.sandbox
.ok_or_else(|| miette::miette!("sandbox not found"))?;

// Verify the sandbox is ready before issuing the exec.
if SandboxPhase::try_from(sandbox.phase) != Ok(SandboxPhase::Ready) {
return Err(miette::miette!(
"sandbox '{}' is not ready (phase: {}); wait for it to reach Ready state",
name,
phase_name(sandbox.phase)
));
}

// Read stdin if piped (not a TTY), using spawn_blocking to avoid blocking
// the async runtime. Cap the read at MAX_STDIN_PAYLOAD + 1 so we never
// buffer more than the limit into memory.
let stdin_payload = if !std::io::stdin().is_terminal() {
tokio::task::spawn_blocking(|| {
let limit = (MAX_STDIN_PAYLOAD + 1) as u64;
let mut buf = Vec::new();
std::io::stdin()
.take(limit)
.read_to_end(&mut buf)
.into_diagnostic()?;
if buf.len() > MAX_STDIN_PAYLOAD {
return Err(miette::miette!(
"stdin payload exceeds {} byte limit; pipe smaller inputs or use `sandbox upload`",
MAX_STDIN_PAYLOAD
));
}
Ok(buf)
})
.await
.into_diagnostic()?? // first ? unwraps JoinError, second ? unwraps Result
} else {
Vec::new()
};

// Resolve TTY mode: explicit --tty / --no-tty wins, otherwise auto-detect.
let tty = tty_override
.unwrap_or_else(|| std::io::stdin().is_terminal() && std::io::stdout().is_terminal());

// Make the streaming gRPC call.
let mut stream = client
.exec_sandbox(ExecSandboxRequest {
sandbox_id: sandbox.id,
command: command.to_vec(),
workdir: workdir.unwrap_or_default().to_string(),
environment: HashMap::new(),
timeout_seconds,
stdin: stdin_payload,
tty,
})
.await
.into_diagnostic()?
.into_inner();

// Stream output to terminal in real-time.
let mut exit_code = 0i32;
let stdout = std::io::stdout();
let stderr = std::io::stderr();

while let Some(event) = stream.next().await {
let event = event.into_diagnostic()?;
match event.payload {
Some(exec_sandbox_event::Payload::Stdout(out)) => {
let mut handle = stdout.lock();
handle.write_all(&out.data).into_diagnostic()?;
handle.flush().into_diagnostic()?;
}
Some(exec_sandbox_event::Payload::Stderr(err)) => {
let mut handle = stderr.lock();
handle.write_all(&err.data).into_diagnostic()?;
handle.flush().into_diagnostic()?;
}
Some(exec_sandbox_event::Payload::Exit(exit)) => {
exit_code = exit.exit_code;
}
None => {}
}
}

Ok(exit_code)
}

/// Print a single YAML line with dimmed keys and regular values.
fn print_yaml_line(line: &str) {
// Find leading whitespace
Expand Down
29 changes: 27 additions & 2 deletions crates/openshell-server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ impl OpenShell for OpenShellService {
.map_err(|e| Status::invalid_argument(format!("command construction failed: {e}")))?;
let stdin_payload = req.stdin;
let timeout_seconds = req.timeout_seconds;
let request_tty = req.tty;
let sandbox_id = sandbox.id;
let handshake_secret = self.state.config.ssh_handshake_secret.clone();

Expand All @@ -1056,6 +1057,7 @@ impl OpenShell for OpenShellService {
&command_str,
stdin_payload,
timeout_seconds,
request_tty,
&handshake_secret,
)
.await
Expand Down Expand Up @@ -3716,6 +3718,7 @@ async fn stream_exec_over_ssh(
command: &str,
stdin_payload: Vec<u8>,
timeout_seconds: u32,
request_tty: bool,
handshake_secret: &str,
) -> Result<(), Status> {
let command_preview: String = command.chars().take(120).collect();
Expand Down Expand Up @@ -3764,8 +3767,13 @@ async fn stream_exec_over_ssh(
}
};

let exec =
run_exec_with_russh(local_proxy_port, command, stdin_payload.clone(), tx.clone());
let exec = run_exec_with_russh(
local_proxy_port,
command,
stdin_payload.clone(),
request_tty,
tx.clone(),
);

let exec_result = if timeout_seconds == 0 {
exec.await
Expand Down Expand Up @@ -3843,6 +3851,7 @@ async fn run_exec_with_russh(
local_proxy_port: u16,
command: &str,
stdin_payload: Vec<u8>,
request_tty: bool,
tx: mpsc::Sender<Result<ExecSandboxEvent, Status>>,
) -> Result<i32, Status> {
// Defense-in-depth: validate command at the transport boundary even though
Expand Down Expand Up @@ -3886,6 +3895,22 @@ async fn run_exec_with_russh(
.await
.map_err(|e| Status::internal(format!("failed to open ssh channel: {e}")))?;

// Request a PTY before exec when the client asked for terminal allocation.
if request_tty {
channel
.request_pty(
false,
"xterm-256color",
0, // col_width — 0 lets the server decide
0, // row_height — 0 lets the server decide
0, // pix_width
0, // pix_height
&[],
)
.await
.map_err(|e| Status::internal(format!("failed to allocate PTY: {e}")))?;
}

channel
.exec(true, command.as_bytes())
.await
Expand Down
3 changes: 3 additions & 0 deletions proto/openshell.proto
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ message ExecSandboxRequest {

// Optional stdin payload passed to the command.
bytes stdin = 6;

// Request a pseudo-terminal for the remote command.
bool tty = 7;
}

// One stdout chunk from a sandbox exec.
Expand Down
Loading