From 2e33c0557949eaf6d42d109d2f24023a0f27ba4c Mon Sep 17 00:00:00 2001 From: Peter L Date: Fri, 6 Feb 2026 22:00:05 +1030 Subject: [PATCH] Fix Arrow Spill Underrun (#20159) ## Which issue does this PR close? - Closes https://github.com/apache/datafusion/issues/19425 ## Rationale for this change This adjusts the way that the spill channel works. Currently we have a spill writer & reader pairing which uses a mutex to coordindate when a file is ready to be read. What happens is, that because we were using a `spawn_buffered` call, the read task would race ahead trying to read a file which is yet to be written out completely. Alongside this, we need to flush each write to the file, as there is a chance that another thread may see stale data. ## What changes are included in this PR? Adds a flush on write, and converts the read task to not buffer reads. ## Are these changes tested? I haven't written a test, but I have been running the example in the attached issue. While it now fails with allocation errors, the original error goes away. ## Are there any user-facing changes? Nope --- .../src/spill/in_progress_spill_file.rs | 7 +++++++ datafusion/physical-plan/src/spill/mod.rs | 5 +++++ datafusion/physical-plan/src/spill/spill_manager.rs | 13 +++++++++++++ datafusion/physical-plan/src/spill/spill_pool.rs | 8 +++++++- 4 files changed, 32 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index d2acf4993b857..b9ff6b2f3b655 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -88,6 +88,13 @@ impl InProgressSpillFile { Ok(()) } + pub fn flush(&mut self) -> Result<()> { + if let Some(writer) = &mut self.writer { + writer.flush()?; + } + Ok(()) + } + /// Returns a reference to the in-progress file, if it exists. /// This can be used to get the file path for creating readers before the file is finished. pub fn file(&self) -> Option<&RefCountedTempFile> { diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 78dea99ac820c..3c4ee065c3151 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -310,6 +310,11 @@ impl IPCStreamWriter { Ok((delta_num_rows, delta_num_bytes)) } + pub fn flush(&mut self) -> Result<()> { + self.writer.flush()?; + Ok(()) + } + /// Finish the writer pub fn finish(&mut self) -> Result<()> { self.writer.finish().map_err(Into::into) diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 89b0276206774..6d931112ad888 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -188,6 +188,19 @@ impl SpillManager { Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) } + + /// Same as `read_spill_as_stream`, but without buffering. + pub fn read_spill_as_stream_unbuffered( + &self, + spill_file_path: RefCountedTempFile, + max_record_batch_memory: Option, + ) -> Result { + Ok(Box::pin(cooperative(SpillReaderStream::new( + Arc::clone(&self.schema), + spill_file_path, + max_record_batch_memory, + )))) + } } pub(crate) trait GetSlicedSize { diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index e3b547b5731f3..e8eea360da8cc 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -194,6 +194,8 @@ impl SpillPoolWriter { // Append the batch if let Some(ref mut writer) = file_shared.writer { writer.append_batch(batch)?; + // make sure we flush the writer for readers + writer.flush()?; file_shared.batches_written += 1; file_shared.estimated_size += batch_size; } @@ -535,7 +537,11 @@ impl Stream for SpillFile { // Step 2: Lazy-create reader stream if needed if self.reader.is_none() && should_read { if let Some(file) = file { - match self.spill_manager.read_spill_as_stream(file, None) { + // we want this unbuffered because files are actively being written to + match self + .spill_manager + .read_spill_as_stream_unbuffered(file, None) + { Ok(stream) => { self.reader = Some(SpillFileReader { stream,