From 09a8630f2f441d8674677d08d48dc887c9e55310 Mon Sep 17 00:00:00 2001 From: Xander Date: Tue, 3 Mar 2026 14:40:30 +0000 Subject: [PATCH 1/2] Fix repartition from dropping data when spilling --- .../physical-plan/src/spill/spill_pool.rs | 109 +++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index 1b9d82eaf4506..ad755e13a15f1 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -61,6 +61,10 @@ struct SpillPoolShared { /// Writer's reference to the current file (shared by all cloned writers). /// Has its own lock to allow I/O without blocking queue access. current_write_file: Option>>, + /// Number of active writer clones. Only when this reaches zero should + /// `writer_dropped` be set to true. This prevents premature EOF signaling + /// when one writer clone is dropped while others are still active. + active_writer_count: usize, } impl SpillPoolShared { @@ -72,6 +76,7 @@ impl SpillPoolShared { waker: None, writer_dropped: false, current_write_file: None, + active_writer_count: 1, } } @@ -97,7 +102,6 @@ impl SpillPoolShared { /// The writer automatically manages file rotation based on the `max_file_size_bytes` /// configured in [`channel`]. When the last writer clone is dropped, it finalizes the /// current file so readers can access all written data. -#[derive(Clone)] pub struct SpillPoolWriter { /// Maximum size in bytes before rotating to a new file. /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`. @@ -106,6 +110,18 @@ pub struct SpillPoolWriter { shared: Arc>, } +impl Clone for SpillPoolWriter { + fn clone(&self) -> Self { + // Increment the active writer count so that `writer_dropped` is only + // set to true when the *last* clone is dropped. + self.shared.lock().active_writer_count += 1; + Self { + max_file_size_bytes: self.max_file_size_bytes, + shared: Arc::clone(&self.shared), + } + } +} + impl SpillPoolWriter { /// Spills a batch to the pool, rotating files when necessary. /// @@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter { fn drop(&mut self) { let mut shared = self.shared.lock(); + shared.active_writer_count -= 1; + let is_last_writer = shared.active_writer_count == 0; + + if !is_last_writer { + // Other writer clones are still active; do not finalize or + // signal EOF to readers. + return; + } + // Finalize the current file when the last writer is dropped if let Some(current_file) = shared.current_write_file.take() { // Release shared lock before locking file @@ -1343,6 +1368,88 @@ mod tests { Ok(()) } + /// Regression test for data loss when multiple writer clones are used. + /// + /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning + /// mode all input partition tasks share clones of the same writer. Before + /// the fix, `Drop` unconditionally set `writer_dropped = true` even when + /// other clones were still alive. This caused the `SpillPoolReader` to + /// return EOF prematurely, silently losing every batch written by the + /// remaining writers. + /// + /// The test sequence is: + /// + /// 1. writer1 writes a batch, then is dropped. + /// 2. The reader consumes that batch. + /// 3. The reader polls again — the queue is now empty. + /// - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF). + /// - **Fix**: `active_writer_count > 0` → `Pending` (wait for data). + /// 4. writer2 (still alive) writes a batch. + /// 5. The reader must see that batch — not silently lose it. + #[tokio::test] + async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> { + let (writer1, mut reader) = create_spill_channel(1024 * 1024); + let writer2 = writer1.clone(); + + // Synchronization: tell writer2 when it may proceed. + let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>(); + + // Spawn writer2 — it waits for the signal before writing. + let writer2_handle = SpawnedTask::spawn(async move { + proceed_rx.await.unwrap(); + writer2.push_batch(&create_test_batch(10, 10)).unwrap(); + // writer2 is dropped here (last clone → true EOF) + }); + + // Writer1 writes one batch, then drops. + writer1.push_batch(&create_test_batch(0, 10))?; + drop(writer1); + + // Read writer1's batch. + let batch1 = reader.next().await.unwrap()?; + assert_eq!(batch1.num_rows(), 10); + let col = batch1 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 0); + + // Signal writer2 to write its batch. It will execute when the + // current task yields (i.e. when reader.next() returns Pending). + proceed_tx.send(()).unwrap(); + + // With the bug the reader returns None here because it already + // saw writer_dropped=true on an empty queue. With the fix it + // returns Pending, the runtime schedules writer2, and the batch + // becomes available. + let batch2 = + tokio::time::timeout(std::time::Duration::from_secs(5), reader.next()) + .await + .expect("Reader timed out — should not hang"); + + assert!( + batch2.is_some(), + "Reader returned None prematurely — batch from writer2 was lost \ + because dropping writer1 incorrectly signaled EOF" + ); + let batch2 = batch2.unwrap()?; + assert_eq!(batch2.num_rows(), 10); + let col = batch2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 10); + + writer2_handle.await.unwrap(); + + // All writers dropped — reader should see real EOF now. + assert!(reader.next().await.is_none()); + + Ok(()) + } + #[tokio::test] async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> { use datafusion_execution::runtime_env::RuntimeEnvBuilder; From 59cfe3b5eeed79455254a974b28a39e70ec375e3 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 4 Mar 2026 15:28:23 +0000 Subject: [PATCH 2/2] comment --- .../physical-plan/src/spill/spill_pool.rs | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index ad755e13a15f1..2777b753bb37a 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -1368,24 +1368,21 @@ mod tests { Ok(()) } - /// Regression test for data loss when multiple writer clones are used. + /// Verifies that the reader stays alive as long as any writer clone exists. /// /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning - /// mode all input partition tasks share clones of the same writer. Before - /// the fix, `Drop` unconditionally set `writer_dropped = true` even when - /// other clones were still alive. This caused the `SpillPoolReader` to - /// return EOF prematurely, silently losing every batch written by the - /// remaining writers. + /// mode multiple input partition tasks share clones of the same writer. + /// The reader must not see EOF until **all** clones have been dropped, + /// even if the queue is temporarily empty between writes from different + /// clones. /// /// The test sequence is: /// /// 1. writer1 writes a batch, then is dropped. - /// 2. The reader consumes that batch. - /// 3. The reader polls again — the queue is now empty. - /// - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF). - /// - **Fix**: `active_writer_count > 0` → `Pending` (wait for data). - /// 4. writer2 (still alive) writes a batch. - /// 5. The reader must see that batch — not silently lose it. + /// 2. The reader consumes that batch (queue is now empty). + /// 3. writer2 (still alive) writes a batch. + /// 4. The reader must see that batch. + /// 5. EOF is only signalled after writer2 is also dropped. #[tokio::test] async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> { let (writer1, mut reader) = create_spill_channel(1024 * 1024); @@ -1419,10 +1416,7 @@ mod tests { // current task yields (i.e. when reader.next() returns Pending). proceed_tx.send(()).unwrap(); - // With the bug the reader returns None here because it already - // saw writer_dropped=true on an empty queue. With the fix it - // returns Pending, the runtime schedules writer2, and the batch - // becomes available. + // The reader should wait (Pending) for writer2's data, not EOF. let batch2 = tokio::time::timeout(std::time::Duration::from_secs(5), reader.next()) .await @@ -1430,8 +1424,7 @@ mod tests { assert!( batch2.is_some(), - "Reader returned None prematurely — batch from writer2 was lost \ - because dropping writer1 incorrectly signaled EOF" + "Reader must not return EOF while a writer clone is still alive" ); let batch2 = batch2.unwrap()?; assert_eq!(batch2.num_rows(), 10);