From f7abdab198eca142da0a06853e96ca2d46e3e38f Mon Sep 17 00:00:00 2001 From: Sion Kang Date: Wed, 6 May 2026 17:53:39 +0900 Subject: [PATCH 1/3] perf: pipeline SFTP requests for upload/download (~2-3x speedup) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The high-level `AsyncWrite`/`AsyncRead` impls on `File` issue exactly one SFTP `WRITE`/`READ` at a time and `await` its `STATUS`/`DATA` reply before sending the next. Sustained throughput is therefore bounded by `chunk_size / RTT` — at 50 ms RTT with the default 256 KiB chunk that caps a single transfer at ~5 MiB/s no matter how fast the link is. Add two pipelined helpers on `File` that keep up to N SFTP requests in flight concurrently, mirroring how OpenSSH's `sftp(1)` client behaves (`-R 64` by default): * `File::write_all_pipelined(reader, max_inflight)` — reads chunks from `reader` and dispatches `session.write(...)` futures via `FuturesUnordered`, refilling the pipeline as in-flight writes complete. Memory bounded by `max_inflight * write_len`. * `File::read_to_writer_pipelined(writer, max_inflight)` — symmetric for downloads. Out-of-order responses are buffered in a `BTreeMap` keyed by offset and flushed to `writer` as soon as the next-expected chunk arrives. Wire `Client::upload_file`/`download_file`/`upload_dir_recursive`/ `download_dir_recursive` to use the new helpers with `MAX_INFLIGHT_REQUESTS = 64`. Measured on macOS arm64 against `bssh-server` v2.1.3 on loopback with a 1 GiB file: | op | build | real | RSS | |----------|------------------------|---------|----------| | upload | vanilla v2.1.3 | 39.30s | 3.23 GB | | upload | streaming-only | 3.47s | 20 MB | | upload | streaming + pipelined | 2.27s | 49 MB | | download | vanilla v2.1.3 | 3.93s | 2.17 GB | | download | streaming-only | 3.41s | 16 MB | | download | streaming + pipelined | 1.34s | 288 MB | Pipelining adds ~+53% on upload and ~+155% on download throughput on top of the streaming patch (which already eliminated the whole-file load). Peak RSS stays well below the unpatched levels: download holds at most ~`max_inflight` chunks pending in the reorder map, and upload caps at `max_inflight * chunk_size + reader buffer`. --- crates/bssh-russh-sftp/Cargo.toml | 1 + crates/bssh-russh-sftp/src/client/fs/file.rs | 177 +++++++++++++++++++ src/ssh/tokio_client/file_transfer.rs | 58 +++--- 3 files changed, 205 insertions(+), 31 deletions(-) diff --git a/crates/bssh-russh-sftp/Cargo.toml b/crates/bssh-russh-sftp/Cargo.toml index a69960c7..9bd21c6c 100644 --- a/crates/bssh-russh-sftp/Cargo.toml +++ b/crates/bssh-russh-sftp/Cargo.toml @@ -20,6 +20,7 @@ tokio = { version = "1", default-features = false, features = [ "macros", ] } tokio-util = "0.7" +futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" bitflags = { version = "2.9", features = ["serde"] } diff --git a/crates/bssh-russh-sftp/src/client/fs/file.rs b/crates/bssh-russh-sftp/src/client/fs/file.rs index 4b1cdcd2..0adbe6df 100644 --- a/crates/bssh-russh-sftp/src/client/fs/file.rs +++ b/crates/bssh-russh-sftp/src/client/fs/file.rs @@ -92,6 +92,183 @@ impl File { self.session.fsync(self.handle.as_str()).await.map(|_| ()) } + + /// Streams `reader` to this remote file with up to `max_inflight` concurrent + /// SFTP `WRITE` requests in flight. Each request carries up to the negotiated + /// `write_len` (or [`MAX_WRITE_LENGTH`] when no limit is advertised). + /// + /// The high-level [`AsyncWrite`] impl issues one `WRITE` at a time and waits + /// for its `STATUS` reply before sending the next, so sustained throughput is + /// bounded by `chunk_size / RTT`. This helper hides the per-request RTT by + /// keeping multiple in-flight, mirroring how OpenSSH's `sftp` client behaves + /// (~64 outstanding requests by default). + /// + /// On success returns the number of bytes streamed. Updates `self.pos` to + /// the new write offset. Reading from `reader` and dispatching writes are + /// interleaved, so memory usage is bounded by `max_inflight * chunk_size`. + pub async fn write_all_pipelined( + &mut self, + reader: &mut R, + max_inflight: usize, + ) -> SftpResult + where + R: tokio::io::AsyncRead + Unpin, + { + use futures::stream::{FuturesUnordered, StreamExt}; + use tokio::io::AsyncReadExt; + + if max_inflight == 0 { + return Err(Error::UnexpectedBehavior( + "max_inflight must be at least 1".to_owned(), + )); + } + + let chunk_size = self + .extensions + .limits + .as_ref() + .and_then(|l| l.write_len) + .map(|n| n as usize) + .unwrap_or(MAX_WRITE_LENGTH as usize); + + let mut total: u64 = 0; + let mut offset = self.pos; + let mut in_flight = FuturesUnordered::new(); + let mut eof = false; + + loop { + // Top up the pipeline with new chunks until we hit the cap or EOF. + while !eof && in_flight.len() < max_inflight { + let mut buf = vec![0u8; chunk_size]; + let n = reader.read(&mut buf).await.map_err(io::Error::from)?; + if n == 0 { + eof = true; + break; + } + buf.truncate(n); + + let session = self.session.clone(); + let handle = self.handle.clone(); + let off = offset; + + in_flight.push(async move { + session.write(handle, off, buf).await?; + SftpResult::Ok(n as u64) + }); + + offset += n as u64; + total += n as u64; + } + + // Drain at least one in-flight write before reading more, otherwise + // we busy-loop the read path while writes never get a chance to make + // progress. + match in_flight.next().await { + Some(Ok(_)) => {} + Some(Err(e)) => return Err(e), + None => break, // pipeline drained and no more data → done + } + } + + self.pos = offset; + Ok(total) + } + + /// Streams the remote file from the current position to `writer` using up to + /// `max_inflight` concurrent SFTP `READ` requests. Each request asks for up + /// to the negotiated `read_len` (or [`MAX_READ_LENGTH`] when no limit is + /// advertised). + /// + /// Like [`Self::write_all_pipelined`], this hides per-request RTT. Chunks + /// are reassembled in offset order before being written to `writer`, so the + /// output is identical to a sequential read. Stops on the first server + /// short read (server signalled EOF). + /// + /// Returns the number of bytes streamed. Updates `self.pos`. + pub async fn read_to_writer_pipelined( + &mut self, + writer: &mut W, + max_inflight: usize, + ) -> SftpResult + where + W: tokio::io::AsyncWrite + Unpin, + { + use futures::stream::{FuturesUnordered, StreamExt}; + use std::collections::BTreeMap; + use tokio::io::AsyncWriteExt; + + if max_inflight == 0 { + return Err(Error::UnexpectedBehavior( + "max_inflight must be at least 1".to_owned(), + )); + } + + let chunk_size = self + .extensions + .limits + .as_ref() + .and_then(|l| l.read_len) + .map(|n| n as usize) + .unwrap_or(MAX_READ_LENGTH as usize); + + let mut total: u64 = 0; + let mut next_offset = self.pos; + let mut next_to_write = self.pos; + let mut pending: BTreeMap> = BTreeMap::new(); + let mut in_flight = FuturesUnordered::new(); + let mut eof = false; + + loop { + // Schedule new read requests until we hit the cap or have observed EOF. + while !eof && in_flight.len() < max_inflight { + let session = self.session.clone(); + let handle = self.handle.clone(); + let off = next_offset; + let len = chunk_size as u32; + + in_flight.push(async move { + match session.read(handle, off, len).await { + Ok(data) => SftpResult::Ok((off, Some(data.data))), + Err(Error::Status(s)) if s.status_code == StatusCode::Eof => { + SftpResult::Ok((off, None)) + } + Err(e) => Err(e), + } + }); + + next_offset += chunk_size as u64; + } + + match in_flight.next().await { + Some(Ok((off, Some(data)))) => { + if data.is_empty() { + eof = true; + } else { + pending.insert(off, data); + } + } + Some(Ok((_, None))) => { + eof = true; + } + Some(Err(e)) => return Err(e), + None => break, + } + + // Flush in-order chunks to writer as they become available. + while let Some(chunk) = pending.remove(&next_to_write) { + let n = chunk.len() as u64; + writer + .write_all(&chunk) + .await + .map_err(io::Error::from)?; + next_to_write += n; + total += n; + } + } + + self.pos = next_to_write; + Ok(total) + } } impl Drop for File { diff --git a/src/ssh/tokio_client/file_transfer.rs b/src/ssh/tokio_client/file_transfer.rs index 5fda7622..7afe6861 100644 --- a/src/ssh/tokio_client/file_transfer.rs +++ b/src/ssh/tokio_client/file_transfer.rs @@ -21,10 +21,15 @@ use russh_sftp::{client::SftpSession, protocol::OpenFlags}; use std::path::Path; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; use super::connection::Client; -use crate::utils::buffer_pool::global; + +/// Maximum number of concurrent SFTP `WRITE`/`READ` requests held in flight per +/// transfer. Mirrors OpenSSH `sftp(1)`'s default (`-R 64`) — large enough to +/// hide per-request RTT on intra-DC and intercontinental links, small enough to +/// keep peak buffer memory bounded (`MAX_INFLIGHT * MAX_WRITE_LENGTH ≈ 16 MiB`). +const MAX_INFLIGHT_REQUESTS: usize = 64; impl Client { /// Upload a file with sftp to the remote server. @@ -46,21 +51,20 @@ impl Client { channel.request_subsystem(true, "sftp").await?; let sftp = SftpSession::new(channel.into_stream()).await?; - // read file contents locally - let file_contents = tokio::fs::read(src_file_path) + // Stream local file with multiple SFTP WRITE requests in flight to + // hide per-request RTT and avoid loading the entire file in memory. + let mut local_file = tokio::fs::File::open(src_file_path) .await .map_err(super::Error::IoError)?; - // interaction with i/o let mut file = sftp .open_with_flags( dest_file_path, OpenFlags::CREATE | OpenFlags::TRUNCATE | OpenFlags::WRITE | OpenFlags::READ, ) .await?; - file.write_all(&file_contents) - .await - .map_err(super::Error::IoError)?; + file.write_all_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) + .await?; file.flush().await.map_err(super::Error::IoError)?; file.shutdown().await.map_err(super::Error::IoError)?; @@ -84,25 +88,18 @@ impl Client { channel.request_subsystem(true, "sftp").await?; let sftp = SftpSession::new(channel.into_stream()).await?; - // open remote file for reading + // Stream remote file with multiple SFTP READ requests in flight; chunks + // are reassembled in offset order before being written to disk. let mut remote_file = sftp .open_with_flags(remote_file_path, OpenFlags::READ) .await?; - // Use pooled buffer for reading file contents to reduce allocations - let mut pooled_buffer = global::get_large_buffer(); - remote_file.read_to_end(pooled_buffer.as_mut_vec()).await?; - let contents = pooled_buffer.as_vec().clone(); // Clone to owned Vec for writing - - // write contents to local file let mut local_file = tokio::fs::File::create(local_file_path.as_ref()) .await .map_err(super::Error::IoError)?; - - local_file - .write_all(&contents) - .await - .map_err(super::Error::IoError)?; + remote_file + .read_to_writer_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) + .await?; local_file.flush().await.map_err(super::Error::IoError)?; Ok(()) @@ -173,8 +170,8 @@ impl Client { let _ = sftp.create_dir(&remote_path).await; // Ignore error if already exists self.upload_dir_recursive(sftp, &path, &remote_path).await?; } else if metadata.is_file() { - // Upload file - let file_contents = tokio::fs::read(&path) + // Stream local file with pipelined SFTP WRITEs. + let mut local_file = tokio::fs::File::open(&path) .await .map_err(super::Error::IoError)?; @@ -186,9 +183,8 @@ impl Client { .await?; remote_file - .write_all(&file_contents) - .await - .map_err(super::Error::IoError)?; + .write_all_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) + .await?; remote_file.flush().await.map_err(super::Error::IoError)?; remote_file .shutdown() @@ -265,17 +261,17 @@ impl Client { self.download_dir_recursive(sftp, &remote_path, &local_path) .await?; } else if metadata.file_type().is_file() { - // Download file using pooled buffer + // Stream remote file with pipelined SFTP READs. let mut remote_file = sftp.open_with_flags(&remote_path, OpenFlags::READ).await?; - let mut pooled_buffer = global::get_large_buffer(); - remote_file.read_to_end(pooled_buffer.as_mut_vec()).await?; - let contents = pooled_buffer.as_vec().clone(); - - tokio::fs::write(&local_path, contents) + let mut local_file = tokio::fs::File::create(&local_path) .await .map_err(super::Error::IoError)?; + remote_file + .read_to_writer_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) + .await?; + local_file.flush().await.map_err(super::Error::IoError)?; } } From 9e73c4f57da20410e5b56585fca37beb972e5ee0 Mon Sep 17 00:00:00 2001 From: Sion Kang Date: Thu, 7 May 2026 14:04:16 +0900 Subject: [PATCH 2/3] fix: apply rustfmt to pipelined read path Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 1 + crates/bssh-russh-sftp/src/client/fs/file.rs | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index edb80b38..7097badb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,7 @@ dependencies = [ "bytes", "chrono", "flurry", + "futures", "log", "serde", "serde_bytes", diff --git a/crates/bssh-russh-sftp/src/client/fs/file.rs b/crates/bssh-russh-sftp/src/client/fs/file.rs index 0adbe6df..76e3935c 100644 --- a/crates/bssh-russh-sftp/src/client/fs/file.rs +++ b/crates/bssh-russh-sftp/src/client/fs/file.rs @@ -257,10 +257,7 @@ impl File { // Flush in-order chunks to writer as they become available. while let Some(chunk) = pending.remove(&next_to_write) { let n = chunk.len() as u64; - writer - .write_all(&chunk) - .await - .map_err(io::Error::from)?; + writer.write_all(&chunk).await.map_err(io::Error::from)?; next_to_write += n; total += n; } From e20c0684ce585c7c3eced732b217fce6205d0538 Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Sun, 10 May 2026 18:44:46 +0900 Subject: [PATCH 3/3] fix: harden SFTP pipelined transfers --- crates/bssh-russh-sftp/src/client/fs/file.rs | 284 +++++++++++++++++-- 1 file changed, 255 insertions(+), 29 deletions(-) diff --git a/crates/bssh-russh-sftp/src/client/fs/file.rs b/crates/bssh-russh-sftp/src/client/fs/file.rs index 76e3935c..a0e28d44 100644 --- a/crates/bssh-russh-sftp/src/client/fs/file.rs +++ b/crates/bssh-russh-sftp/src/client/fs/file.rs @@ -21,6 +21,10 @@ type StateFn = Option> + Send + Syn const MAX_READ_LENGTH: u64 = 261120; const MAX_WRITE_LENGTH: u64 = 261120; +fn bounded_chunk_size(limit: Option, default_limit: u64) -> usize { + limit.map_or(default_limit, |n| n.min(default_limit)) as usize +} + struct FileState { f_read: StateFn>>, f_seek: StateFn, @@ -123,13 +127,10 @@ impl File { )); } - let chunk_size = self - .extensions - .limits - .as_ref() - .and_then(|l| l.write_len) - .map(|n| n as usize) - .unwrap_or(MAX_WRITE_LENGTH as usize); + let chunk_size = bounded_chunk_size( + self.extensions.limits.as_ref().and_then(|l| l.write_len), + MAX_WRITE_LENGTH, + ); let mut total: u64 = 0; let mut offset = self.pos; @@ -140,7 +141,7 @@ impl File { // Top up the pipeline with new chunks until we hit the cap or EOF. while !eof && in_flight.len() < max_inflight { let mut buf = vec![0u8; chunk_size]; - let n = reader.read(&mut buf).await.map_err(io::Error::from)?; + let n = reader.read(&mut buf).await?; if n == 0 { eof = true; break; @@ -166,7 +167,7 @@ impl File { match in_flight.next().await { Some(Ok(_)) => {} Some(Err(e)) => return Err(e), - None => break, // pipeline drained and no more data → done + None => break, // pipeline drained and no more data -> done } } @@ -176,13 +177,13 @@ impl File { /// Streams the remote file from the current position to `writer` using up to /// `max_inflight` concurrent SFTP `READ` requests. Each request asks for up - /// to the negotiated `read_len` (or [`MAX_READ_LENGTH`] when no limit is - /// advertised). + /// to the negotiated `read_len`, capped at [`MAX_READ_LENGTH`]. /// /// Like [`Self::write_all_pipelined`], this hides per-request RTT. Chunks /// are reassembled in offset order before being written to `writer`, so the - /// output is identical to a sequential read. Stops on the first server - /// short read (server signalled EOF). + /// output is identical to a sequential read. For regular files, the current + /// file size is used to avoid speculative reads beyond EOF; if the size is + /// unavailable, the transfer stops on EOF or the first short read. /// /// Returns the number of bytes streamed. Updates `self.pos`. pub async fn read_to_writer_pipelined( @@ -203,13 +204,16 @@ impl File { )); } - let chunk_size = self - .extensions - .limits - .as_ref() - .and_then(|l| l.read_len) - .map(|n| n as usize) - .unwrap_or(MAX_READ_LENGTH as usize); + let chunk_size = bounded_chunk_size( + self.extensions.limits.as_ref().and_then(|l| l.read_len), + MAX_READ_LENGTH, + ); + let file_end = self + .metadata() + .await + .ok() + .and_then(|m| m.size) + .filter(|&size| size >= self.pos); let mut total: u64 = 0; let mut next_offset = self.pos; @@ -219,35 +223,59 @@ impl File { let mut eof = false; loop { - // Schedule new read requests until we hit the cap or have observed EOF. - while !eof && in_flight.len() < max_inflight { + // Keep the total reorder buffer bounded. A slow early read can make + // later replies arrive first; counting both pending and in-flight + // chunks prevents unbounded memory growth in that case. + while !eof + && in_flight.len() + pending.len() < max_inflight + && file_end.is_none_or(|end| next_offset < end) + { let session = self.session.clone(); let handle = self.handle.clone(); let off = next_offset; - let len = chunk_size as u32; + let len = file_end.map_or(chunk_size as u64, |end| { + (end - next_offset).min(chunk_size as u64) + }) as u32; in_flight.push(async move { match session.read(handle, off, len).await { - Ok(data) => SftpResult::Ok((off, Some(data.data))), + Ok(data) => SftpResult::Ok((off, len, Some(data.data))), Err(Error::Status(s)) if s.status_code == StatusCode::Eof => { - SftpResult::Ok((off, None)) + SftpResult::Ok((off, len, None)) } Err(e) => Err(e), } }); - next_offset += chunk_size as u64; + next_offset += u64::from(len); } match in_flight.next().await { - Some(Ok((off, Some(data)))) => { + Some(Ok((off, len, Some(data)))) => { if data.is_empty() { eof = true; } else { + if let Some(end) = file_end { + let got_end = off.saturating_add(data.len() as u64); + if data.len() != len as usize || got_end > end { + return Err(Error::UnexpectedBehavior(format!( + "short read before EOF at offset {off}: requested {len} bytes, received {} bytes", + data.len() + ))); + } + } else if data.len() < len as usize { + eof = true; + } + pending.insert(off, data); } } - Some(Ok((_, None))) => { + Some(Ok((off, _, None))) => { + if file_end.is_some_and(|end| off < end) { + return Err(Error::UnexpectedBehavior(format!( + "unexpected EOF before file size at offset {off}" + ))); + } eof = true; } Some(Err(e)) => return Err(e), @@ -257,7 +285,7 @@ impl File { // Flush in-order chunks to writer as they become available. while let Some(chunk) = pending.remove(&next_to_write) { let n = chunk.len() as u64; - writer.write_all(&chunk).await.map_err(io::Error::from)?; + writer.write_all(&chunk).await?; next_to_write += n; total += n; } @@ -268,6 +296,204 @@ impl File { } } +#[cfg(test)] +mod tests { + use std::{ + future::Future, + sync::{Arc, Mutex}, + }; + + use tokio::io::duplex; + + use super::*; + use crate::{ + client::SftpSession, + protocol::{Attrs, Data, FileAttributes, Handle, OpenFlags, Status, Version}, + server, + server::Handler, + }; + + struct MemoryHandler { + data: Arc>>, + } + + impl MemoryHandler { + fn ok_status(id: u32) -> Status { + Status { + id, + status_code: StatusCode::Ok, + error_message: String::new(), + language_tag: String::new(), + } + } + } + + impl Handler for MemoryHandler { + type Error = StatusCode; + + fn unimplemented(&self) -> Self::Error { + StatusCode::OpUnsupported + } + + fn init( + &mut self, + _version: u32, + _extensions: std::collections::HashMap, + ) -> impl Future> + Send { + async { Ok(Version::new()) } + } + + fn open( + &mut self, + id: u32, + _filename: String, + _pflags: OpenFlags, + _attrs: FileAttributes, + ) -> impl Future> + Send { + async move { + Ok(Handle { + id, + handle: "memory".to_owned(), + }) + } + } + + fn close( + &mut self, + id: u32, + _handle: String, + ) -> impl Future> + Send { + async move { Ok(Self::ok_status(id)) } + } + + fn fstat( + &mut self, + id: u32, + _handle: String, + ) -> impl Future> + Send { + let data = self.data.clone(); + + async move { + let mut attrs = FileAttributes::empty(); + attrs.size = Some(data.lock().expect("memory file lock poisoned").len() as u64); + Ok(Attrs { id, attrs }) + } + } + + fn read( + &mut self, + id: u32, + _handle: String, + offset: u64, + len: u32, + ) -> impl Future> + Send { + let data = self.data.clone(); + + async move { + let data = data.lock().expect("memory file lock poisoned"); + let offset = usize::try_from(offset).map_err(|_| StatusCode::Failure)?; + if offset >= data.len() { + return Err(StatusCode::Eof); + } + let end = offset.saturating_add(len as usize).min(data.len()); + + Ok(Data { + id, + data: data[offset..end].to_vec(), + }) + } + } + + fn write( + &mut self, + id: u32, + _handle: String, + offset: u64, + bytes: Vec, + ) -> impl Future> + Send { + let data = self.data.clone(); + + async move { + let mut data = data.lock().expect("memory file lock poisoned"); + let offset = usize::try_from(offset).map_err(|_| StatusCode::Failure)?; + let end = offset.checked_add(bytes.len()).ok_or(StatusCode::Failure)?; + if data.len() < end { + data.resize(end, 0); + } + data[offset..end].copy_from_slice(&bytes); + + Ok(Self::ok_status(id)) + } + } + } + + async fn memory_session(data: Arc>>) -> SftpSession { + let (client, server_stream) = duplex(64 * 1024); + server::run(server_stream, MemoryHandler { data }).await; + SftpSession::new(client).await.expect("memory SFTP init") + } + + #[test] + fn advertised_chunk_sizes_are_capped() { + assert_eq!( + bounded_chunk_size(None, MAX_READ_LENGTH), + MAX_READ_LENGTH as usize + ); + assert_eq!(bounded_chunk_size(Some(1024), MAX_READ_LENGTH), 1024); + assert_eq!( + bounded_chunk_size(Some(MAX_READ_LENGTH * 4), MAX_READ_LENGTH), + MAX_READ_LENGTH as usize + ); + } + + #[tokio::test] + async fn write_all_pipelined_streams_all_bytes() { + let remote_data = Arc::new(Mutex::new(Vec::new())); + let sftp = memory_session(remote_data.clone()).await; + let input: Vec = (0..(MAX_WRITE_LENGTH as usize * 2 + 123)) + .map(|n| (n % 251) as u8) + .collect(); + let mut reader = &input[..]; + let mut file = sftp + .open_with_flags( + "ignored", + OpenFlags::CREATE | OpenFlags::TRUNCATE | OpenFlags::WRITE, + ) + .await + .expect("open memory file"); + + let written = file + .write_all_pipelined(&mut reader, 4) + .await + .expect("pipelined write"); + + assert_eq!(written as usize, input.len()); + assert_eq!( + *remote_data.lock().expect("memory file lock poisoned"), + input + ); + } + + #[tokio::test] + async fn read_to_writer_pipelined_streams_all_bytes() { + let input: Vec = (0..(MAX_READ_LENGTH as usize * 2 + 123)) + .map(|n| (n % 251) as u8) + .collect(); + let remote_data = Arc::new(Mutex::new(input.clone())); + let sftp = memory_session(remote_data).await; + let mut file = sftp.open("ignored").await.expect("open memory file"); + let mut output = Vec::new(); + + let read = file + .read_to_writer_pipelined(&mut output, 4) + .await + .expect("pipelined read"); + + assert_eq!(read as usize, input.len()); + assert_eq!(output, input); + } +} + impl Drop for File { fn drop(&mut self) { if self.closed {