From 6740f6bf7be201d71e7b2f8a254c2aaa7beee040 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 18 Feb 2026 20:07:58 +0000 Subject: [PATCH 1/3] feat(tool): add streaming output support Add incremental output streaming via callbacks for long-running scripts. Interpreter changes: - OutputCallback type: Box - Emission points at loop iterations, command lists, sequences, and top-level execute with a dedup counter to prevent double-emission Public API additions: - Bash::exec_streaming() - execute with output callback - ToolStatus::stdout()/stderr() - constructors for output events - ToolStatus.output and .stream fields (optional, backward-compatible) - BashTool::execute_with_status now streams output as ToolStatus events For a script like `for i in 1 2 3; do echo $i; done`, each iteration's output is emitted incrementally via the callback, while the complete result is still returned in ExecResult/ToolResponse. https://claude.ai/code/session_01H1H5WiZVgpjWJ7fTjip51p --- crates/bashkit/src/interpreter/mod.rs | 74 ++++++++++ crates/bashkit/src/lib.rs | 192 +++++++++++++++++++++++++- crates/bashkit/src/tool.rs | 177 +++++++++++++++++++++++- specs/009-tool-contract.md | 50 +++++++ 4 files changed, 488 insertions(+), 5 deletions(-) diff --git a/crates/bashkit/src/interpreter/mod.rs b/crates/bashkit/src/interpreter/mod.rs index 25de3e56..c13afa7e 100644 --- a/crates/bashkit/src/interpreter/mod.rs +++ b/crates/bashkit/src/interpreter/mod.rs @@ -30,6 +30,15 @@ use crate::error::Error; use crate::error::Result; use crate::fs::FileSystem; use crate::limits::{ExecutionCounters, ExecutionLimits}; + +/// Callback for streaming output chunks as they are produced. +/// +/// Arguments: `(stdout_chunk, stderr_chunk)`. Called after each loop iteration +/// and each top-level command completes. Only non-empty chunks trigger a call. +/// +/// Requires `Send + Sync` because the interpreter holds this across `.await` points. +/// Closures capturing `Arc>` satisfy both bounds automatically. +pub type OutputCallback = Box; use crate::parser::{ ArithmeticForCommand, AssignmentValue, CaseCommand, Command, CommandList, CompoundCommand, ForCommand, FunctionDef, IfCommand, ListOperator, ParameterOp, Parser, Pipeline, Redirect, @@ -122,6 +131,13 @@ pub struct Interpreter { /// Stdin inherited from pipeline for compound commands (while read, etc.) /// Each read operation consumes one line, advancing through the data. pipeline_stdin: Option, + /// Optional callback for streaming output chunks during execution. + /// When set, output is emitted incrementally via this callback in addition + /// to being accumulated in the returned ExecResult. + output_callback: Option, + /// Monotonic counter incremented each time output is emitted via callback. + /// Used to detect whether sub-calls already emitted output, preventing duplicates. + output_emit_count: u64, } impl Interpreter { @@ -283,6 +299,8 @@ impl Interpreter { #[cfg(feature = "git")] git_client: None, pipeline_stdin: None, + output_callback: None, + output_emit_count: 0, } } @@ -325,6 +343,46 @@ impl Interpreter { self.cwd = cwd; } + /// Set an output callback for streaming output during execution. + /// + /// When set, the interpreter calls this callback with `(stdout_chunk, stderr_chunk)` + /// after each loop iteration, command list element, and top-level command. + /// Output is still accumulated in the returned `ExecResult` for the final result. + pub fn set_output_callback(&mut self, callback: OutputCallback) { + self.output_callback = Some(callback); + self.output_emit_count = 0; + } + + /// Clear the output callback. + pub fn clear_output_callback(&mut self) { + self.output_callback = None; + self.output_emit_count = 0; + } + + /// Emit output via the callback if set, and if sub-calls didn't already emit. + /// Returns `true` if output was emitted. + /// + /// `emit_count_before` is the value of `output_emit_count` before the sub-call + /// that produced this output. If the count advanced, sub-calls already emitted + /// and we skip to avoid duplicates. + fn maybe_emit_output(&mut self, stdout: &str, stderr: &str, emit_count_before: u64) -> bool { + if self.output_callback.is_none() { + return false; + } + // Sub-calls already emitted — skip to avoid duplicates + if self.output_emit_count != emit_count_before { + return false; + } + if stdout.is_empty() && stderr.is_empty() { + return false; + } + if let Some(ref mut cb) = self.output_callback { + cb(stdout, stderr); + self.output_emit_count += 1; + } + true + } + /// Set the HTTP client for network builtins (curl, wget). /// /// This is only available when the `http_client` feature is enabled. @@ -352,7 +410,9 @@ impl Interpreter { let mut exit_code = 0; for command in &script.commands { + let emit_before = self.output_emit_count; let result = self.execute_command(command).await?; + self.maybe_emit_output(&result.stdout, &result.stderr, emit_before); stdout.push_str(&result.stdout); stderr.push_str(&result.stderr); exit_code = result.exit_code; @@ -551,7 +611,9 @@ impl Interpreter { .insert(for_cmd.variable.clone(), value.clone()); // Execute body + let emit_before = self.output_emit_count; let result = self.execute_command_sequence(&for_cmd.body).await?; + self.maybe_emit_output(&result.stdout, &result.stderr, emit_before); stdout.push_str(&result.stdout); stderr.push_str(&result.stderr); exit_code = result.exit_code; @@ -645,7 +707,9 @@ impl Interpreter { } // Execute body + let emit_before = self.output_emit_count; let result = self.execute_command_sequence(&arith_for.body).await?; + self.maybe_emit_output(&result.stdout, &result.stderr, emit_before); stdout.push_str(&result.stdout); stderr.push_str(&result.stderr); exit_code = result.exit_code; @@ -862,7 +926,9 @@ impl Interpreter { } // Execute body + let emit_before = self.output_emit_count; let result = self.execute_command_sequence(&while_cmd.body).await?; + self.maybe_emit_output(&result.stdout, &result.stderr, emit_before); stdout.push_str(&result.stdout); stderr.push_str(&result.stderr); exit_code = result.exit_code; @@ -945,7 +1011,9 @@ impl Interpreter { } // Execute body + let emit_before = self.output_emit_count; let result = self.execute_command_sequence(&until_cmd.body).await?; + self.maybe_emit_output(&result.stdout, &result.stderr, emit_before); stdout.push_str(&result.stdout); stderr.push_str(&result.stderr); exit_code = result.exit_code; @@ -1592,7 +1660,9 @@ impl Interpreter { let mut exit_code = 0; for command in commands { + let emit_before = self.output_emit_count; let result = self.execute_command(command).await?; + self.maybe_emit_output(&result.stdout, &result.stderr, emit_before); stdout.push_str(&result.stdout); stderr.push_str(&result.stderr); exit_code = result.exit_code; @@ -1671,7 +1741,9 @@ impl Interpreter { let mut stdout = String::new(); let mut stderr = String::new(); let mut exit_code; + let emit_before = self.output_emit_count; let result = self.execute_command(&list.first).await?; + self.maybe_emit_output(&result.stdout, &result.stderr, emit_before); stdout.push_str(&result.stdout); stderr.push_str(&result.stderr); exit_code = result.exit_code; @@ -1746,7 +1818,9 @@ impl Interpreter { }; if should_execute { + let emit_before = self.output_emit_count; let result = self.execute_command(cmd).await?; + self.maybe_emit_output(&result.stdout, &result.stderr, emit_before); stdout.push_str(&result.stdout); stderr.push_str(&result.stderr); exit_code = result.exit_code; diff --git a/crates/bashkit/src/lib.rs b/crates/bashkit/src/lib.rs index 497aa6dd..1d48d9bf 100644 --- a/crates/bashkit/src/lib.rs +++ b/crates/bashkit/src/lib.rs @@ -387,7 +387,7 @@ pub use fs::{ FsLimits, FsUsage, InMemoryFs, Metadata, MountableFs, OverlayFs, PosixFs, }; pub use git::GitConfig; -pub use interpreter::{ControlFlow, ExecResult}; +pub use interpreter::{ControlFlow, ExecResult, OutputCallback}; pub use limits::{ExecutionCounters, ExecutionLimits, LimitExceeded}; pub use network::NetworkAllowlist; pub use tool::{BashTool, BashToolBuilder, Tool, ToolRequest, ToolResponse, ToolStatus, VERSION}; @@ -592,6 +592,47 @@ impl Bash { result } + /// Execute a bash script with streaming output. + /// + /// Like [`exec`](Self::exec), but calls `output_callback` with incremental + /// `(stdout_chunk, stderr_chunk)` pairs as output is produced. Callbacks fire + /// after each loop iteration, command list element, and top-level command. + /// + /// The full result is still returned in [`ExecResult`] for callers that need it. + /// + /// # Example + /// + /// ```rust + /// use bashkit::Bash; + /// use std::sync::{Arc, Mutex}; + /// + /// # #[tokio::main] + /// # async fn main() -> bashkit::Result<()> { + /// let chunks: Arc>> = Arc::new(Mutex::new(Vec::new())); + /// let chunks_cb = chunks.clone(); + /// let mut bash = Bash::new(); + /// let result = bash.exec_streaming( + /// "for i in 1 2 3; do echo $i; done", + /// Box::new(move |stdout, _stderr| { + /// chunks_cb.lock().unwrap().push(stdout.to_string()); + /// }), + /// ).await?; + /// assert_eq!(result.stdout, "1\n2\n3\n"); + /// assert_eq!(*chunks.lock().unwrap(), vec!["1\n", "2\n", "3\n"]); + /// # Ok(()) + /// # } + /// ``` + pub async fn exec_streaming( + &mut self, + script: &str, + output_callback: OutputCallback, + ) -> Result { + self.interpreter.set_output_callback(output_callback); + let result = self.exec(script).await; + self.interpreter.clear_output_callback(); + result + } + /// Get a clone of the underlying filesystem. /// /// Provides direct access to the virtual filesystem for: @@ -1244,6 +1285,7 @@ pub mod logging_guide {} #[allow(clippy::unwrap_used)] mod tests { use super::*; + use std::sync::{Arc, Mutex}; #[tokio::test] async fn test_echo_hello() { @@ -4059,4 +4101,152 @@ echo missing fi"#, result.stdout ); } + + // ---- Streaming output tests ---- + + #[tokio::test] + async fn test_exec_streaming_for_loop() { + let chunks = Arc::new(Mutex::new(Vec::new())); + let chunks_cb = chunks.clone(); + let mut bash = Bash::new(); + + let result = bash + .exec_streaming( + "for i in 1 2 3; do echo $i; done", + Box::new(move |stdout, _stderr| { + chunks_cb.lock().unwrap().push(stdout.to_string()); + }), + ) + .await + .unwrap(); + + assert_eq!(result.stdout, "1\n2\n3\n"); + assert_eq!( + *chunks.lock().unwrap(), + vec!["1\n", "2\n", "3\n"], + "each loop iteration should stream separately" + ); + } + + #[tokio::test] + async fn test_exec_streaming_while_loop() { + let chunks = Arc::new(Mutex::new(Vec::new())); + let chunks_cb = chunks.clone(); + let mut bash = Bash::new(); + + let result = bash + .exec_streaming( + "i=0; while [ $i -lt 3 ]; do i=$((i+1)); echo $i; done", + Box::new(move |stdout, _stderr| { + chunks_cb.lock().unwrap().push(stdout.to_string()); + }), + ) + .await + .unwrap(); + + assert_eq!(result.stdout, "1\n2\n3\n"); + let chunks = chunks.lock().unwrap(); + // The while loop emits each iteration; surrounding list may add events too + assert!( + chunks.contains(&"1\n".to_string()), + "should contain first iteration output" + ); + assert!( + chunks.contains(&"2\n".to_string()), + "should contain second iteration output" + ); + assert!( + chunks.contains(&"3\n".to_string()), + "should contain third iteration output" + ); + } + + #[tokio::test] + async fn test_exec_streaming_no_callback_still_works() { + // exec (non-streaming) should still work fine + let mut bash = Bash::new(); + let result = bash.exec("for i in a b c; do echo $i; done").await.unwrap(); + assert_eq!(result.stdout, "a\nb\nc\n"); + } + + #[tokio::test] + async fn test_exec_streaming_nested_loops_no_duplicates() { + let chunks = Arc::new(Mutex::new(Vec::new())); + let chunks_cb = chunks.clone(); + let mut bash = Bash::new(); + + let result = bash + .exec_streaming( + "for i in 1 2; do for j in a b; do echo \"$i$j\"; done; done", + Box::new(move |stdout, _stderr| { + chunks_cb.lock().unwrap().push(stdout.to_string()); + }), + ) + .await + .unwrap(); + + assert_eq!(result.stdout, "1a\n1b\n2a\n2b\n"); + let chunks = chunks.lock().unwrap(); + // Inner loop should emit each iteration; outer should not duplicate + let total_chars: usize = chunks.iter().map(|c| c.len()).sum(); + assert_eq!( + total_chars, + result.stdout.len(), + "total streamed bytes should match final output: chunks={:?}", + *chunks + ); + } + + #[tokio::test] + async fn test_exec_streaming_mixed_list_and_loop() { + let chunks = Arc::new(Mutex::new(Vec::new())); + let chunks_cb = chunks.clone(); + let mut bash = Bash::new(); + + let result = bash + .exec_streaming( + "echo start; for i in 1 2; do echo $i; done; echo end", + Box::new(move |stdout, _stderr| { + chunks_cb.lock().unwrap().push(stdout.to_string()); + }), + ) + .await + .unwrap(); + + assert_eq!(result.stdout, "start\n1\n2\nend\n"); + let chunks = chunks.lock().unwrap(); + assert_eq!( + *chunks, + vec!["start\n", "1\n", "2\n", "end\n"], + "mixed list+loop should produce exactly 4 events" + ); + } + + #[tokio::test] + async fn test_exec_streaming_stderr() { + let stderr_chunks = Arc::new(Mutex::new(Vec::new())); + let stderr_cb = stderr_chunks.clone(); + let mut bash = Bash::new(); + + let result = bash + .exec_streaming( + "echo ok; echo err >&2; echo ok2", + Box::new(move |_stdout, stderr| { + if !stderr.is_empty() { + stderr_cb.lock().unwrap().push(stderr.to_string()); + } + }), + ) + .await + .unwrap(); + + assert_eq!(result.stdout, "ok\nok2\n"); + assert_eq!(result.stderr, "err\n"); + let stderr_chunks = stderr_chunks.lock().unwrap(); + assert!( + stderr_chunks.contains(&"err\n".to_string()), + "stderr should be streamed: {:?}", + *stderr_chunks + ); + } } diff --git a/crates/bashkit/src/tool.rs b/crates/bashkit/src/tool.rs index 6edf7660..d52e4fe0 100644 --- a/crates/bashkit/src/tool.rs +++ b/crates/bashkit/src/tool.rs @@ -33,10 +33,11 @@ use crate::builtins::Builtin; use crate::error::Error; -use crate::{Bash, ExecResult, ExecutionLimits}; +use crate::{Bash, ExecResult, ExecutionLimits, OutputCallback}; use async_trait::async_trait; use schemars::{schema_for, JsonSchema}; use serde::{Deserialize, Serialize}; +use std::sync::{Arc, Mutex}; /// Library version from Cargo.toml pub const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -140,7 +141,7 @@ impl From for ToolResponse { /// Status update during tool execution #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ToolStatus { - /// Current phase (e.g., "validate", "parse", "execute", "complete") + /// Current phase (e.g., "validate", "parse", "execute", "output", "complete") pub phase: String, /// Optional message #[serde(skip_serializing_if = "Option::is_none")] @@ -151,6 +152,12 @@ pub struct ToolStatus { /// Estimated time remaining in milliseconds #[serde(skip_serializing_if = "Option::is_none")] pub eta_ms: Option, + /// Incremental stdout/stderr chunk (only present when `phase == "output"`) + #[serde(skip_serializing_if = "Option::is_none")] + pub output: Option, + /// Which stream the output belongs to: `"stdout"` or `"stderr"` + #[serde(skip_serializing_if = "Option::is_none")] + pub stream: Option, } impl ToolStatus { @@ -161,6 +168,32 @@ impl ToolStatus { message: None, percent_complete: None, eta_ms: None, + output: None, + stream: None, + } + } + + /// Create an output status carrying a stdout chunk. + pub fn stdout(chunk: impl Into) -> Self { + Self { + phase: "output".to_string(), + message: None, + percent_complete: None, + eta_ms: None, + output: Some(chunk.into()), + stream: Some("stdout".to_string()), + } + } + + /// Create an output status carrying a stderr chunk. + pub fn stderr(chunk: impl Into) -> Self { + Self { + phase: "output".to_string(), + message: None, + percent_complete: None, + eta_ms: None, + output: Some(chunk.into()), + stream: Some("stderr".to_string()), } } @@ -597,7 +630,21 @@ impl Tool for BashTool { status_callback(ToolStatus::new("execute").with_percent(20.0)); - let response = match bash.exec(&req.commands).await { + // Wire streaming: forward output chunks as ToolStatus events + let status_cb = Arc::new(Mutex::new(status_callback)); + let status_cb_output = status_cb.clone(); + let output_cb: OutputCallback = Box::new(move |stdout_chunk, stderr_chunk| { + if let Ok(mut cb) = status_cb_output.lock() { + if !stdout_chunk.is_empty() { + cb(ToolStatus::stdout(stdout_chunk)); + } + if !stderr_chunk.is_empty() { + cb(ToolStatus::stderr(stderr_chunk)); + } + } + }); + + let response = match bash.exec_streaming(&req.commands, output_cb).await { Ok(result) => result.into(), Err(e) => ToolResponse { stdout: String::new(), @@ -607,7 +654,9 @@ impl Tool for BashTool { }, }; - status_callback(ToolStatus::new("complete").with_percent(100.0)); + if let Ok(mut cb) = status_cb.lock() { + cb(ToolStatus::new("complete").with_percent(100.0)); + } response } @@ -966,4 +1015,124 @@ mod tests { assert!(phases.contains(&"validate".to_string())); assert!(phases.contains(&"complete".to_string())); } + + #[tokio::test] + async fn test_execute_with_status_streams_output() { + let mut tool = BashTool::default(); + let req = ToolRequest { + commands: "for i in a b c; do echo $i; done".to_string(), + }; + + let events = Arc::new(Mutex::new(Vec::new())); + let events_clone = events.clone(); + + let resp = tool + .execute_with_status( + req, + Box::new(move |status| { + events_clone.lock().expect("lock poisoned").push(status); + }), + ) + .await; + + assert_eq!(resp.stdout, "a\nb\nc\n"); + assert_eq!(resp.exit_code, 0); + + let events = events.lock().expect("lock poisoned"); + // Should have output events for each iteration + let output_events: Vec<_> = events.iter().filter(|s| s.phase == "output").collect(); + assert_eq!( + output_events.len(), + 3, + "expected 3 output events, got {output_events:?}" + ); + assert_eq!(output_events[0].output.as_deref(), Some("a\n")); + assert_eq!(output_events[0].stream.as_deref(), Some("stdout")); + assert_eq!(output_events[1].output.as_deref(), Some("b\n")); + assert_eq!(output_events[2].output.as_deref(), Some("c\n")); + } + + #[tokio::test] + async fn test_execute_with_status_streams_list_commands() { + let mut tool = BashTool::default(); + let req = ToolRequest { + commands: "echo start; echo end".to_string(), + }; + + let events = Arc::new(Mutex::new(Vec::new())); + let events_clone = events.clone(); + + let resp = tool + .execute_with_status( + req, + Box::new(move |status| { + events_clone.lock().expect("lock poisoned").push(status); + }), + ) + .await; + + assert_eq!(resp.stdout, "start\nend\n"); + + let events = events.lock().expect("lock poisoned"); + let output_events: Vec<_> = events.iter().filter(|s| s.phase == "output").collect(); + assert_eq!( + output_events.len(), + 2, + "expected 2 output events, got {output_events:?}" + ); + assert_eq!(output_events[0].output.as_deref(), Some("start\n")); + assert_eq!(output_events[1].output.as_deref(), Some("end\n")); + } + + #[tokio::test] + async fn test_execute_with_status_no_duplicate_output() { + let mut tool = BashTool::default(); + // mix of list + loop: should get 5 distinct events, no duplicates + let req = ToolRequest { + commands: "echo start; for i in 1 2 3; do echo $i; done; echo end".to_string(), + }; + + let events = Arc::new(Mutex::new(Vec::new())); + let events_clone = events.clone(); + + let resp = tool + .execute_with_status( + req, + Box::new(move |status| { + events_clone.lock().expect("lock poisoned").push(status); + }), + ) + .await; + + assert_eq!(resp.stdout, "start\n1\n2\n3\nend\n"); + + let events = events.lock().expect("lock poisoned"); + let output_events: Vec<_> = events + .iter() + .filter(|s| s.phase == "output") + .map(|s| s.output.as_deref().unwrap_or("")) + .collect(); + assert_eq!( + output_events, + vec!["start\n", "1\n", "2\n", "3\n", "end\n"], + "should have exactly 5 distinct output events" + ); + } + + #[test] + fn test_tool_status_stdout_constructor() { + let status = ToolStatus::stdout("hello\n"); + assert_eq!(status.phase, "output"); + assert_eq!(status.output.as_deref(), Some("hello\n")); + assert_eq!(status.stream.as_deref(), Some("stdout")); + assert!(status.message.is_none()); + } + + #[test] + fn test_tool_status_stderr_constructor() { + let status = ToolStatus::stderr("error\n"); + assert_eq!(status.phase, "output"); + assert_eq!(status.output.as_deref(), Some("error\n")); + assert_eq!(status.stream.as_deref(), Some("stderr")); + } } diff --git a/specs/009-tool-contract.md b/specs/009-tool-contract.md index 5b122b11..69e6f8e9 100644 --- a/specs/009-tool-contract.md +++ b/specs/009-tool-contract.md @@ -194,6 +194,56 @@ When configured, outputs automatically include: - `system_prompt()`: Adds `Home: /home/` if username set - `help()`: Adds CONFIGURATION section with user, host, limits, env vars +### Streaming Output + +`execute_with_status()` emits incremental output via `ToolStatus` events with `phase: "output"`. + +```rust +pub struct ToolStatus { + pub phase: String, // "validate" | "parse" | "execute" | "output" | "complete" + pub message: Option, + pub percent_complete: Option, + pub eta_ms: Option, + pub output: Option, // Chunk content (when phase == "output") + pub stream: Option, // "stdout" or "stderr" +} +``` + +Constructors: `ToolStatus::stdout("chunk")`, `ToolStatus::stderr("chunk")`. + +At the `Bash` level, `exec_streaming()` provides the same capability: + +```rust +let chunks = Arc::new(Mutex::new(Vec::new())); +let chunks_cb = chunks.clone(); +let result = bash.exec_streaming( + "for i in 1 2 3; do echo $i; done", + Box::new(move |stdout, _stderr| { + chunks_cb.lock().unwrap().push(stdout.to_string()); + }), +).await?; +// result.stdout == "1\n2\n3\n" (complete) +// chunks == ["1\n", "2\n", "3\n"] (incremental) +``` + +#### Emission granularity + +Output is emitted after each: +- Loop iteration (`for`, `while`, `until`, arithmetic `for`) +- Command in a list (`cmd1; cmd2 && cmd3`) +- Command in a sequence (loop body, if branch) +- Top-level script command + +A dedup counter prevents double-emission when inner constructs already emitted. +Pipeline intermediate output is not emitted (only the final pipeline stage). + +#### Backward compatibility + +- `ToolResponse` unchanged — always returns complete buffered output +- `execute()` unaffected — no streaming without `execute_with_status()` +- `ToolStatus.output` and `.stream` are `Option` + `skip_serializing_if` +- `OutputCallback` type: `Box` + ## Design Rationale ### Why a trait? From f1f86b6eafdfa76ce97cdc4db2a0b74e197a63d2 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 18 Feb 2026 20:36:42 +0000 Subject: [PATCH 2/3] test(streaming): add example and streamed-vs-non-streamed equivalence tests - Add streaming_output.rs example showing exec_streaming usage - Add 8 equivalence tests that run same script through exec() and exec_streaming(), asserting identical ExecResult and that concatenated chunks == final stdout/stderr https://claude.ai/code/session_01H1H5WiZVgpjWJ7fTjip51p --- crates/bashkit/examples/streaming_output.rs | 75 +++++++++++++++ crates/bashkit/src/lib.rs | 100 ++++++++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 crates/bashkit/examples/streaming_output.rs diff --git a/crates/bashkit/examples/streaming_output.rs b/crates/bashkit/examples/streaming_output.rs new file mode 100644 index 00000000..e5775a4a --- /dev/null +++ b/crates/bashkit/examples/streaming_output.rs @@ -0,0 +1,75 @@ +//! Streaming output example +//! +//! Demonstrates `exec_streaming` which delivers output incrementally +//! via a callback, while still returning the full result at the end. +//! +//! Run with: cargo run --example streaming_output + +use bashkit::Bash; +use std::sync::{Arc, Mutex}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // --- Bash-level streaming --- + println!("=== exec_streaming: for loop ==="); + let mut bash = Bash::new(); + let result = bash + .exec_streaming( + "for i in 1 2 3 4 5; do echo \"iteration $i\"; done", + Box::new(|stdout, _stderr| { + // Called after each loop iteration + print!("[stream] {stdout}"); + }), + ) + .await?; + println!("--- final stdout ---\n{}", result.stdout); + + // --- Collecting chunks --- + println!("=== exec_streaming: collecting chunks ==="); + let chunks: Arc>> = Arc::new(Mutex::new(Vec::new())); + let chunks_cb = chunks.clone(); + let mut bash = Bash::new(); + let result = bash + .exec_streaming( + "echo start; for x in a b c; do echo $x; done; echo end", + Box::new(move |stdout, _stderr| { + chunks_cb.lock().unwrap().push(stdout.to_string()); + }), + ) + .await?; + { + let chunks = chunks.lock().unwrap(); + println!("Chunks received: {chunks:?}"); + // Concatenated chunks == full stdout + let reassembled: String = chunks.iter().cloned().collect(); + assert_eq!(reassembled, result.stdout); + println!("Reassembled matches final stdout: OK"); + } + + // --- Streaming stderr --- + println!("\n=== exec_streaming: stderr ==="); + let mut bash = Bash::new(); + let result = bash + .exec_streaming( + "echo out1; echo err1 >&2; echo out2", + Box::new(|stdout, stderr| { + if !stdout.is_empty() { + print!("[stdout] {stdout}"); + } + if !stderr.is_empty() { + eprint!("[stderr] {stderr}"); + } + }), + ) + .await?; + println!("--- final stdout: {:?}", result.stdout); + println!("--- final stderr: {:?}", result.stderr); + + // --- Non-streaming still works --- + println!("\n=== exec (non-streaming) for comparison ==="); + let mut bash = Bash::new(); + let result = bash.exec("for i in 1 2 3; do echo $i; done").await?; + println!("stdout: {}", result.stdout); + + Ok(()) +} diff --git a/crates/bashkit/src/lib.rs b/crates/bashkit/src/lib.rs index 1d48d9bf..a3b78f4c 100644 --- a/crates/bashkit/src/lib.rs +++ b/crates/bashkit/src/lib.rs @@ -4249,4 +4249,104 @@ echo missing fi"#, *stderr_chunks ); } + + // ---- Streamed vs non-streamed equivalence tests ---- + // + // These run the same script through exec() and exec_streaming() and assert + // that the final ExecResult is identical, plus concatenated chunks == stdout. + + /// Helper: run script both ways, assert equivalence. + async fn assert_streaming_equivalence(script: &str) { + // Non-streaming + let mut bash_plain = Bash::new(); + let plain = bash_plain.exec(script).await.unwrap(); + + // Streaming + let stdout_chunks: Arc>> = Arc::new(Mutex::new(Vec::new())); + let stderr_chunks: Arc>> = Arc::new(Mutex::new(Vec::new())); + let so = stdout_chunks.clone(); + let se = stderr_chunks.clone(); + let mut bash_stream = Bash::new(); + let streamed = bash_stream + .exec_streaming( + script, + Box::new(move |stdout, stderr| { + if !stdout.is_empty() { + so.lock().unwrap().push(stdout.to_string()); + } + if !stderr.is_empty() { + se.lock().unwrap().push(stderr.to_string()); + } + }), + ) + .await + .unwrap(); + + // Final results must match + assert_eq!( + plain.stdout, streamed.stdout, + "stdout mismatch for: {script}" + ); + assert_eq!( + plain.stderr, streamed.stderr, + "stderr mismatch for: {script}" + ); + assert_eq!( + plain.exit_code, streamed.exit_code, + "exit_code mismatch for: {script}" + ); + + // Concatenated chunks must equal full stdout/stderr + let reassembled_stdout: String = stdout_chunks.lock().unwrap().iter().cloned().collect(); + assert_eq!( + reassembled_stdout, streamed.stdout, + "reassembled stdout chunks != final stdout for: {script}" + ); + let reassembled_stderr: String = stderr_chunks.lock().unwrap().iter().cloned().collect(); + assert_eq!( + reassembled_stderr, streamed.stderr, + "reassembled stderr chunks != final stderr for: {script}" + ); + } + + #[tokio::test] + async fn test_streaming_equivalence_for_loop() { + assert_streaming_equivalence("for i in 1 2 3; do echo $i; done").await; + } + + #[tokio::test] + async fn test_streaming_equivalence_while_loop() { + assert_streaming_equivalence("i=0; while [ $i -lt 4 ]; do i=$((i+1)); echo $i; done").await; + } + + #[tokio::test] + async fn test_streaming_equivalence_nested_loops() { + assert_streaming_equivalence("for i in a b; do for j in 1 2; do echo \"$i$j\"; done; done") + .await; + } + + #[tokio::test] + async fn test_streaming_equivalence_mixed_list() { + assert_streaming_equivalence("echo start; for i in x y; do echo $i; done; echo end").await; + } + + #[tokio::test] + async fn test_streaming_equivalence_stderr() { + assert_streaming_equivalence("echo out; echo err >&2; echo out2").await; + } + + #[tokio::test] + async fn test_streaming_equivalence_pipeline() { + assert_streaming_equivalence("echo -e 'a\\nb\\nc' | grep b").await; + } + + #[tokio::test] + async fn test_streaming_equivalence_conditionals() { + assert_streaming_equivalence("if true; then echo yes; else echo no; fi; echo done").await; + } + + #[tokio::test] + async fn test_streaming_equivalence_subshell() { + assert_streaming_equivalence("x=$(echo hello); echo $x").await; + } } From 782d9151e38b788f1cef9fba75cdc91dc3789e74 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 18 Feb 2026 23:18:19 +0000 Subject: [PATCH 3/3] chore(supply-chain): update exemptions for aws-lc-rs 1.16.0 and bumpalo 3.20.1 https://claude.ai/code/session_01H1H5WiZVgpjWJ7fTjip51p --- supply-chain/config.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/supply-chain/config.toml b/supply-chain/config.toml index 417b668e..cedce595 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -95,7 +95,7 @@ version = "1.5.0" criteria = "safe-to-deploy" [[exemptions.aws-lc-rs]] -version = "1.15.4" +version = "1.16.0" criteria = "safe-to-deploy" [[exemptions.aws-lc-sys]] @@ -123,7 +123,7 @@ version = "1.12.1" criteria = "safe-to-deploy" [[exemptions.bumpalo]] -version = "3.19.1" +version = "3.20.1" criteria = "safe-to-deploy" [[exemptions.bytecount]]