Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions crates/bashkit/examples/streaming_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! Streaming output example
//!
//! Demonstrates `exec_streaming` which delivers output incrementally
//! via a callback, while still returning the full result at the end.
//!
//! Run with: cargo run --example streaming_output

use bashkit::Bash;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// --- Bash-level streaming ---
println!("=== exec_streaming: for loop ===");
let mut bash = Bash::new();
let result = bash
.exec_streaming(
"for i in 1 2 3 4 5; do echo \"iteration $i\"; done",
Box::new(|stdout, _stderr| {
// Called after each loop iteration
print!("[stream] {stdout}");
}),
)
.await?;
println!("--- final stdout ---\n{}", result.stdout);

// --- Collecting chunks ---
println!("=== exec_streaming: collecting chunks ===");
let chunks: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let chunks_cb = chunks.clone();
let mut bash = Bash::new();
let result = bash
.exec_streaming(
"echo start; for x in a b c; do echo $x; done; echo end",
Box::new(move |stdout, _stderr| {
chunks_cb.lock().unwrap().push(stdout.to_string());
}),
)
.await?;
{
let chunks = chunks.lock().unwrap();
println!("Chunks received: {chunks:?}");
// Concatenated chunks == full stdout
let reassembled: String = chunks.iter().cloned().collect();
assert_eq!(reassembled, result.stdout);
println!("Reassembled matches final stdout: OK");
}

// --- Streaming stderr ---
println!("\n=== exec_streaming: stderr ===");
let mut bash = Bash::new();
let result = bash
.exec_streaming(
"echo out1; echo err1 >&2; echo out2",
Box::new(|stdout, stderr| {
if !stdout.is_empty() {
print!("[stdout] {stdout}");
}
if !stderr.is_empty() {
eprint!("[stderr] {stderr}");
}
}),
)
.await?;
println!("--- final stdout: {:?}", result.stdout);
println!("--- final stderr: {:?}", result.stderr);

// --- Non-streaming still works ---
println!("\n=== exec (non-streaming) for comparison ===");
let mut bash = Bash::new();
let result = bash.exec("for i in 1 2 3; do echo $i; done").await?;
println!("stdout: {}", result.stdout);

Ok(())
}
74 changes: 74 additions & 0 deletions crates/bashkit/src/interpreter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ use crate::error::Error;
use crate::error::Result;
use crate::fs::FileSystem;
use crate::limits::{ExecutionCounters, ExecutionLimits};

/// Callback for streaming output chunks as they are produced.
///
/// Arguments: `(stdout_chunk, stderr_chunk)`. Called after each loop iteration
/// and each top-level command completes. Only non-empty chunks trigger a call.
///
/// Requires `Send + Sync` because the interpreter holds this across `.await` points.
/// Closures capturing `Arc<Mutex<_>>` satisfy both bounds automatically.
pub type OutputCallback = Box<dyn FnMut(&str, &str) + Send + Sync>;
use crate::parser::{
ArithmeticForCommand, AssignmentValue, CaseCommand, Command, CommandList, CompoundCommand,
ForCommand, FunctionDef, IfCommand, ListOperator, ParameterOp, Parser, Pipeline, Redirect,
Expand Down Expand Up @@ -122,6 +131,13 @@ pub struct Interpreter {
/// Stdin inherited from pipeline for compound commands (while read, etc.)
/// Each read operation consumes one line, advancing through the data.
pipeline_stdin: Option<String>,
/// Optional callback for streaming output chunks during execution.
/// When set, output is emitted incrementally via this callback in addition
/// to being accumulated in the returned ExecResult.
output_callback: Option<OutputCallback>,
/// Monotonic counter incremented each time output is emitted via callback.
/// Used to detect whether sub-calls already emitted output, preventing duplicates.
output_emit_count: u64,
}

impl Interpreter {
Expand Down Expand Up @@ -283,6 +299,8 @@ impl Interpreter {
#[cfg(feature = "git")]
git_client: None,
pipeline_stdin: None,
output_callback: None,
output_emit_count: 0,
}
}

Expand Down Expand Up @@ -325,6 +343,46 @@ impl Interpreter {
self.cwd = cwd;
}

/// Set an output callback for streaming output during execution.
///
/// When set, the interpreter calls this callback with `(stdout_chunk, stderr_chunk)`
/// after each loop iteration, command list element, and top-level command.
/// Output is still accumulated in the returned `ExecResult` for the final result.
pub fn set_output_callback(&mut self, callback: OutputCallback) {
self.output_callback = Some(callback);
self.output_emit_count = 0;
}

/// Clear the output callback.
pub fn clear_output_callback(&mut self) {
self.output_callback = None;
self.output_emit_count = 0;
}

/// Emit output via the callback if set, and if sub-calls didn't already emit.
/// Returns `true` if output was emitted.
///
/// `emit_count_before` is the value of `output_emit_count` before the sub-call
/// that produced this output. If the count advanced, sub-calls already emitted
/// and we skip to avoid duplicates.
fn maybe_emit_output(&mut self, stdout: &str, stderr: &str, emit_count_before: u64) -> bool {
if self.output_callback.is_none() {
return false;
}
// Sub-calls already emitted — skip to avoid duplicates
if self.output_emit_count != emit_count_before {
return false;
}
if stdout.is_empty() && stderr.is_empty() {
return false;
}
if let Some(ref mut cb) = self.output_callback {
cb(stdout, stderr);
self.output_emit_count += 1;
}
true
}

/// Set the HTTP client for network builtins (curl, wget).
///
/// This is only available when the `http_client` feature is enabled.
Expand Down Expand Up @@ -352,7 +410,9 @@ impl Interpreter {
let mut exit_code = 0;

for command in &script.commands {
let emit_before = self.output_emit_count;
let result = self.execute_command(command).await?;
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
stdout.push_str(&result.stdout);
stderr.push_str(&result.stderr);
exit_code = result.exit_code;
Expand Down Expand Up @@ -551,7 +611,9 @@ impl Interpreter {
.insert(for_cmd.variable.clone(), value.clone());

// Execute body
let emit_before = self.output_emit_count;
let result = self.execute_command_sequence(&for_cmd.body).await?;
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
stdout.push_str(&result.stdout);
stderr.push_str(&result.stderr);
exit_code = result.exit_code;
Expand Down Expand Up @@ -645,7 +707,9 @@ impl Interpreter {
}

// Execute body
let emit_before = self.output_emit_count;
let result = self.execute_command_sequence(&arith_for.body).await?;
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
stdout.push_str(&result.stdout);
stderr.push_str(&result.stderr);
exit_code = result.exit_code;
Expand Down Expand Up @@ -862,7 +926,9 @@ impl Interpreter {
}

// Execute body
let emit_before = self.output_emit_count;
let result = self.execute_command_sequence(&while_cmd.body).await?;
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
stdout.push_str(&result.stdout);
stderr.push_str(&result.stderr);
exit_code = result.exit_code;
Expand Down Expand Up @@ -945,7 +1011,9 @@ impl Interpreter {
}

// Execute body
let emit_before = self.output_emit_count;
let result = self.execute_command_sequence(&until_cmd.body).await?;
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
stdout.push_str(&result.stdout);
stderr.push_str(&result.stderr);
exit_code = result.exit_code;
Expand Down Expand Up @@ -1592,7 +1660,9 @@ impl Interpreter {
let mut exit_code = 0;

for command in commands {
let emit_before = self.output_emit_count;
let result = self.execute_command(command).await?;
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
stdout.push_str(&result.stdout);
stderr.push_str(&result.stderr);
exit_code = result.exit_code;
Expand Down Expand Up @@ -1671,7 +1741,9 @@ impl Interpreter {
let mut stdout = String::new();
let mut stderr = String::new();
let mut exit_code;
let emit_before = self.output_emit_count;
let result = self.execute_command(&list.first).await?;
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
stdout.push_str(&result.stdout);
stderr.push_str(&result.stderr);
exit_code = result.exit_code;
Expand Down Expand Up @@ -1746,7 +1818,9 @@ impl Interpreter {
};

if should_execute {
let emit_before = self.output_emit_count;
let result = self.execute_command(cmd).await?;
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
stdout.push_str(&result.stdout);
stderr.push_str(&result.stderr);
exit_code = result.exit_code;
Expand Down
Loading
Loading