diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index 1b9d82eaf4506..2777b753bb37a 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -61,6 +61,10 @@ struct SpillPoolShared { /// Writer's reference to the current file (shared by all cloned writers). /// Has its own lock to allow I/O without blocking queue access. current_write_file: Option>>, + /// Number of active writer clones. Only when this reaches zero should + /// `writer_dropped` be set to true. This prevents premature EOF signaling + /// when one writer clone is dropped while others are still active. + active_writer_count: usize, } impl SpillPoolShared { @@ -72,6 +76,7 @@ impl SpillPoolShared { waker: None, writer_dropped: false, current_write_file: None, + active_writer_count: 1, } } @@ -97,7 +102,6 @@ impl SpillPoolShared { /// The writer automatically manages file rotation based on the `max_file_size_bytes` /// configured in [`channel`]. When the last writer clone is dropped, it finalizes the /// current file so readers can access all written data. -#[derive(Clone)] pub struct SpillPoolWriter { /// Maximum size in bytes before rotating to a new file. /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`. @@ -106,6 +110,18 @@ pub struct SpillPoolWriter { shared: Arc>, } +impl Clone for SpillPoolWriter { + fn clone(&self) -> Self { + // Increment the active writer count so that `writer_dropped` is only + // set to true when the *last* clone is dropped. + self.shared.lock().active_writer_count += 1; + Self { + max_file_size_bytes: self.max_file_size_bytes, + shared: Arc::clone(&self.shared), + } + } +} + impl SpillPoolWriter { /// Spills a batch to the pool, rotating files when necessary. /// @@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter { fn drop(&mut self) { let mut shared = self.shared.lock(); + shared.active_writer_count -= 1; + let is_last_writer = shared.active_writer_count == 0; + + if !is_last_writer { + // Other writer clones are still active; do not finalize or + // signal EOF to readers. + return; + } + // Finalize the current file when the last writer is dropped if let Some(current_file) = shared.current_write_file.take() { // Release shared lock before locking file @@ -1343,6 +1368,81 @@ mod tests { Ok(()) } + /// Verifies that the reader stays alive as long as any writer clone exists. + /// + /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning + /// mode multiple input partition tasks share clones of the same writer. + /// The reader must not see EOF until **all** clones have been dropped, + /// even if the queue is temporarily empty between writes from different + /// clones. + /// + /// The test sequence is: + /// + /// 1. writer1 writes a batch, then is dropped. + /// 2. The reader consumes that batch (queue is now empty). + /// 3. writer2 (still alive) writes a batch. + /// 4. The reader must see that batch. + /// 5. EOF is only signalled after writer2 is also dropped. + #[tokio::test] + async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> { + let (writer1, mut reader) = create_spill_channel(1024 * 1024); + let writer2 = writer1.clone(); + + // Synchronization: tell writer2 when it may proceed. + let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>(); + + // Spawn writer2 — it waits for the signal before writing. + let writer2_handle = SpawnedTask::spawn(async move { + proceed_rx.await.unwrap(); + writer2.push_batch(&create_test_batch(10, 10)).unwrap(); + // writer2 is dropped here (last clone → true EOF) + }); + + // Writer1 writes one batch, then drops. + writer1.push_batch(&create_test_batch(0, 10))?; + drop(writer1); + + // Read writer1's batch. + let batch1 = reader.next().await.unwrap()?; + assert_eq!(batch1.num_rows(), 10); + let col = batch1 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 0); + + // Signal writer2 to write its batch. It will execute when the + // current task yields (i.e. when reader.next() returns Pending). + proceed_tx.send(()).unwrap(); + + // The reader should wait (Pending) for writer2's data, not EOF. + let batch2 = + tokio::time::timeout(std::time::Duration::from_secs(5), reader.next()) + .await + .expect("Reader timed out — should not hang"); + + assert!( + batch2.is_some(), + "Reader must not return EOF while a writer clone is still alive" + ); + let batch2 = batch2.unwrap()?; + assert_eq!(batch2.num_rows(), 10); + let col = batch2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 10); + + writer2_handle.await.unwrap(); + + // All writers dropped — reader should see real EOF now. + assert!(reader.next().await.is_none()); + + Ok(()) + } + #[tokio::test] async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> { use datafusion_execution::runtime_env::RuntimeEnvBuilder;