From bfa1a938d62cfa452151d2439529e05e6882c6a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 21 Apr 2026 18:04:32 +0200 Subject: [PATCH 1/7] feat(parquet): row-group morselization for sibling FileStream stealing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a parquet file is scanned inside a shared sibling-stream pool (the `SharedWorkSource` introduced by #21351), the first stream to open the file now donates its remaining row groups back to the shared queue so idle sibling partitions can steal them. A single large parquet file no longer bottlenecks on one worker. Implementation: - `ParquetOpenState` gains a `SplitAndDonate` state between `LoadMetadata` and `PrepareFilters`. The donor keeps the first eligible row group and pushes each remaining one onto the front of the shared queue as a `PartitionedFile` clone whose `range` is a one-byte `FileRange` at the row group's starting offset. The existing `prune_by_range` path matches that offset and scopes the stealer to just that row group — no new extension types, no metadata carriage, no access-plan donation. - If the caller pre-narrowed the scan with a `file_range` that still covers multiple row groups (byte-range file partitioning), splitting stays inside that range: donated ranges remain subsets of the caller's. - Caller-supplied `ParquetAccessPlan` in `extensions` and single-row- group scopes suppress donation. - `SharedWorkSource` is `pub` and gets `push_front` / `pop_front` / `Default`. `row_group_start_offset` is extracted so it's shared with `prune_by_range`. Stealers re-load the parquet footer; object stores typically cache the range so this is cheap. Sharing loaded metadata across siblings is left for a follow-up. 5 new tests cover: basic donation + stealer round-trip, single-RG files, caller access-plan suppression, splitting inside a caller `file_range`, and single-RG caller ranges. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/datasource-parquet/src/opener.rs | 330 +++++++++++++++++- .../src/row_group_filter.rs | 23 +- datafusion/datasource-parquet/src/source.rs | 4 +- datafusion/datasource/src/file.rs | 9 +- .../datasource/src/file_scan_config/mod.rs | 9 +- datafusion/datasource/src/file_stream/mod.rs | 1 + .../datasource/src/file_stream/work_source.rs | 27 +- 7 files changed, 383 insertions(+), 20 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..4a5de9056ef0a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -19,13 +19,16 @@ use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::build_projection_read_plan; -use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; +use crate::row_group_filter::{ + BloomFilterStatistics, RowGroupAccessPlanFilter, row_group_start_offset, +}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; +use datafusion_datasource::file_stream::SharedWorkSource; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -136,6 +139,11 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Shared work queue of sibling FileStreams, if any. When present, this + /// morselizer may split large parquet files into row-group-sized chunks + /// and donate all-but-one back onto this queue so idle siblings steal + /// them instead of sitting idle behind a single hot file. + pub shared_work_source: Option, } impl fmt::Debug for ParquetMorselizer { @@ -175,6 +183,9 @@ impl Morselizer for ParquetMorselizer { /// LoadMetadata /// | /// v +/// SplitAndDonate +/// | +/// v /// PrepareFilters /// | /// v @@ -213,6 +224,11 @@ enum ParquetOpenState { PruneFile(Box), /// Loading Parquet metadata (in footer) LoadMetadata(BoxFuture<'static, Result>), + /// Optionally donate row-group chunks of this file back to the shared + /// work queue so idle sibling streams can scan them in parallel. CPU-only; + /// the donated chunks carry the already-loaded `ArrowReaderMetadata` so + /// stealers skip the footer round-trip. + SplitAndDonate(Box), /// Specialize any filters for the actual file schema (only known after /// metadata is loaded) PrepareFilters(Box), @@ -242,6 +258,7 @@ impl fmt::Debug for ParquetOpenState { ParquetOpenState::LoadEncryption(_) => "LoadEncryption", ParquetOpenState::PruneFile(_) => "PruneFile", ParquetOpenState::LoadMetadata(_) => "LoadMetadata", + ParquetOpenState::SplitAndDonate(_) => "SplitAndDonate", ParquetOpenState::PrepareFilters(_) => "PrepareFilters", ParquetOpenState::LoadPageIndex(_) => "LoadPageIndex", ParquetOpenState::PruneWithStatistics(_) => "PruneWithStatistics", @@ -289,6 +306,10 @@ struct PreparedParquetOpen { preserve_order: bool, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, + /// Shared work queue used to donate row-group chunks of this file to + /// sibling streams. `None` when the file was already donated (re-donation + /// is intentionally disabled) or when sibling stealing is disabled. + shared_work_source: Option, } /// State of [`ParquetOpenState`] @@ -375,6 +396,10 @@ impl ParquetOpenState { ParquetOpenState::LoadMetadata(future) => { Ok(ParquetOpenState::LoadMetadata(future)) } + ParquetOpenState::SplitAndDonate(loaded) => { + let next = loaded.split_and_donate()?; + Ok(ParquetOpenState::PrepareFilters(Box::new(next))) + } ParquetOpenState::PrepareFilters(loaded) => { let prepared_filters = loaded.prepare_filters()?; Ok(ParquetOpenState::LoadPageIndex( @@ -498,7 +523,7 @@ impl MorselPlanner for ParquetMorselPlanner { } ParquetOpenState::LoadMetadata(future) => { Ok(Some(Self::schedule_io(async move { - Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) + Ok(ParquetOpenState::SplitAndDonate(Box::new(future.await?))) }))) } ParquetOpenState::LoadPageIndex(future) => { @@ -537,6 +562,7 @@ impl ParquetMorselizer { ) -> Result { let file_range = partitioned_file.range.clone(); let extensions = partitioned_file.extensions.clone(); + let shared_work_source = self.shared_work_source.clone(); let file_name = partitioned_file.object_meta.location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); @@ -658,6 +684,7 @@ impl ParquetMorselizer { preserve_order: self.preserve_order, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, + shared_work_source, }) } } @@ -743,6 +770,75 @@ impl PreparedParquetOpen { } impl MetadataLoadedParquetOpen { + /// Donate row-group chunks of this file back to the shared work queue + /// so idle sibling FileStreams steal them. + /// + /// The donor keeps the first eligible row group; each remaining one is + /// pushed to the front of the shared queue as a `PartitionedFile` clone + /// whose `range` is a one-byte `FileRange` at the row group's starting + /// offset. The existing `prune_by_range` path on the stealer matches + /// that offset and keeps only the targeted row group. Stealers re-load + /// the parquet footer; object stores typically cache it so this is + /// cheap. + /// + /// When the caller pre-narrowed the scan with a `file_range` spanning + /// multiple row groups, splitting stays inside that range. + /// + /// Returns `self` unchanged if any guard fails (no shared queue, + /// caller-supplied access plan, or fewer than two row groups in scope). + fn split_and_donate(mut self) -> Result { + let Some(shared) = self.prepared.shared_work_source.take() else { + return Ok(self); + }; + // Caller-supplied `ParquetAccessPlan` is respected as-is. + if let Some(ext) = self.prepared.extensions.as_ref() + && ext.is::() + { + return Ok(self); + } + + let rgs = self.reader_metadata.metadata().row_groups(); + if rgs.len() < 2 { + return Ok(self); + } + + // One-byte `FileRange` at `start` selects exactly one row group via + // `prune_by_range`'s `contains(offset)` check. + let point_range = |start: i64| datafusion_datasource::FileRange { + start, + end: start + 1, + }; + + // Row groups in scope, paired with their starting offset so we only + // walk the metadata once. + let caller_range = self.prepared.file_range.as_ref(); + let eligible: Vec<(usize, i64)> = rgs + .iter() + .enumerate() + .map(|(idx, md)| (idx, row_group_start_offset(md))) + .filter(|(_, offset)| caller_range.is_none_or(|r| r.contains(*offset))) + .collect(); + if eligible.len() < 2 { + return Ok(self); + } + + let (_keep_idx, keep_offset) = eligible[0]; + let donated_files: Vec = eligible[1..] + .iter() + .map(|&(_, offset)| { + let mut file = self.prepared.partitioned_file.clone(); + file.range = Some(point_range(offset)); + file + }) + .collect(); + shared.push_front(donated_files); + + // Donor takes only the kept row group. Safe to override the caller's + // wider range: donated chunks cover every other in-scope row group. + self.prepared.file_range = Some(point_range(keep_offset)); + Ok(self) + } + /// Prepare file-schema coercions and pruning predicates once metadata is /// loaded. fn prepare_filters(self) -> Result { @@ -1676,6 +1772,7 @@ mod test { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + shared_work_source: Option, } impl ParquetMorselizerBuilder { @@ -1702,9 +1799,17 @@ mod test { max_predicate_cache_size: None, reverse_row_groups: false, preserve_order: false, + shared_work_source: None, } } + /// Attach a shared work source so the built morselizer can donate + /// row-group chunks to sibling streams. + fn with_shared_work_source(mut self, shared: SharedWorkSource) -> Self { + self.shared_work_source = Some(shared); + self + } + /// Set the object store (required for building). fn with_store(mut self, store: Arc) -> Self { self.store = Some(store); @@ -1816,6 +1921,7 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + shared_work_source: self.shared_work_source, } } } @@ -2720,4 +2826,224 @@ mod test { "without page index all rows are returned" ); } + + /// Write a 4-row-group file (3 rows per row group) and return + /// `(store, schema, file, data_len)`. + async fn write_four_row_group_file() + -> (Arc, SchemaRef, PartitionedFile, usize) { + let store: Arc = Arc::new(InMemory::new()); + let batches = vec![ + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(), + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(), + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(), + record_batch!(("a", Int32, vec![Some(10), Some(11), Some(12)])).unwrap(), + ]; + let schema = batches[0].schema(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "rg_split.parquet", + batches, + Some(props), + ) + .await; + let file = PartitionedFile::new( + "rg_split.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + (store, schema, file, data_len) + } + + /// Build a single-column morselizer, optionally attached to a shared + /// work source so it can donate. + fn split_test_morselizer( + store: &Arc, + schema: &SchemaRef, + shared: Option<&SharedWorkSource>, + ) -> ParquetMorselizer { + let mut b = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(store)) + .with_schema(Arc::clone(schema)) + .with_projection_indices(&[0]); + if let Some(shared) = shared { + b = b.with_shared_work_source(shared.clone()); + } + b.build() + } + + /// Donor keeps row group 0 and pushes N-1 donated chunks to the + /// shared queue. Donated chunks carry a one-byte `FileRange` and + /// no other magic — no `extensions` payload, no new types. + #[tokio::test] + async fn row_group_split_donates_remaining_row_groups() { + let (store, schema, file, _) = write_four_row_group_file().await; + let shared = SharedWorkSource::default(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file.clone()).await.unwrap(); + let donor_values = collect_int32_values(stream).await; + assert_eq!( + donor_values, + vec![1, 2, 3], + "donor should read only row group 0" + ); + + // Pop donated chunks off the shared queue and read each. + let mut stolen: Vec> = Vec::new(); + while let Some(donated) = shared.pop_front() { + assert!( + donated.range.is_some(), + "donated chunk must carry a FileRange" + ); + assert!( + donated.extensions.is_none(), + "donated chunk must not carry an extensions payload" + ); + let stealer = split_test_morselizer(&store, &schema, None); + let stream = open_file(&stealer, donated).await.unwrap(); + stolen.push(collect_int32_values(stream).await); + } + assert_eq!( + stolen, + vec![vec![4, 5, 6], vec![7, 8, 9], vec![10, 11, 12]], + "each stealer should get exactly one row group, in file order" + ); + } + + /// A single-row-group file has nothing to donate. + #[tokio::test] + async fn row_group_split_skips_single_row_group_file() { + let store: Arc = Arc::new(InMemory::new()); + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let data_len = + write_parquet(Arc::clone(&store), "single.parquet", batch.clone()).await; + let file = PartitionedFile::new( + "single.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + let shared = SharedWorkSource::default(); + let schema = batch.schema(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file).await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!(values, vec![1, 2, 3]); + assert!( + shared.pop_front().is_none(), + "single-row-group file must not donate" + ); + } + + /// A caller-supplied `ParquetAccessPlan` in `extensions` is respected + /// as-is — no donation happens even if the file has many row groups. + #[tokio::test] + async fn row_group_split_respects_caller_access_plan() { + let (store, schema, file, _) = write_four_row_group_file().await; + let mut caller_plan = ParquetAccessPlan::new_all(4); + caller_plan.skip(0); + caller_plan.skip(2); + let file = file.with_extensions(Arc::new(caller_plan)); + let shared = SharedWorkSource::default(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file).await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![4, 5, 6, 10, 11, 12], + "caller plan scans RGs 1 and 3, skipping 0 and 2" + ); + assert!( + shared.pop_front().is_none(), + "caller-supplied access plan must suppress donation" + ); + } + + /// A caller-supplied `file_range` that spans several row groups is + /// still split: donated chunks' narrow ranges are subsets of the + /// caller's range, so caller intent (byte-range partitioning of the + /// file across planner-level partitions) is preserved. + #[tokio::test] + async fn row_group_split_within_caller_file_range() { + let (store, schema, file, data_len) = write_four_row_group_file().await; + let caller_range = datafusion_datasource::FileRange { + start: 0, + end: data_len as i64, + }; + let file = PartitionedFile { + range: Some(caller_range.clone()), + ..file + }; + let shared = SharedWorkSource::default(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file).await.unwrap(); + let donor_values = collect_int32_values(stream).await; + assert_eq!(donor_values, vec![1, 2, 3]); + + let mut donated_count = 0; + let mut all_stolen: Vec = Vec::new(); + while let Some(donated) = shared.pop_front() { + let donated_range = donated + .range + .clone() + .expect("donated chunk needs FileRange"); + assert!( + donated_range.start >= caller_range.start + && donated_range.end <= caller_range.end, + "donated range must stay inside caller's range" + ); + donated_count += 1; + let stealer = split_test_morselizer(&store, &schema, None); + let stream = open_file(&stealer, donated).await.unwrap(); + all_stolen.extend(collect_int32_values(stream).await); + } + assert_eq!(donated_count, 3); + assert_eq!(all_stolen, vec![4, 5, 6, 7, 8, 9, 10, 11, 12]); + } + + /// A caller-supplied `file_range` that contains only a single row + /// group has nothing to split — no donation should happen. + #[tokio::test] + async fn row_group_split_skips_when_caller_range_covers_single_row_group() { + let (store, schema, file, _) = write_four_row_group_file().await; + // Read metadata to locate row group 1's offset, then make a + // caller range that contains only row group 1. + let reader: Box = + DefaultParquetFileReaderFactory::new(Arc::clone(&store)) + .create_reader(0, file.clone(), None, &ExecutionPlanMetricsSet::new()) + .unwrap(); + let md = ArrowReaderMetadata::load_async( + &mut { reader }, + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip), + ) + .await + .unwrap(); + let rg1_offset = row_group_start_offset(md.metadata().row_group(1)); + + let file = PartitionedFile { + range: Some(datafusion_datasource::FileRange { + start: rg1_offset, + end: rg1_offset + 1, + }), + ..file + }; + let shared = SharedWorkSource::default(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file).await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!(values, vec![4, 5, 6], "caller range isolates row group 1"); + assert!( + shared.pop_front().is_none(), + "single-row-group caller range must suppress donation" + ); + } } diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 3f254c9f55282..3c4961c57858f 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -33,6 +33,17 @@ use parquet::data_type::Decimal; use parquet::schema::types::SchemaDescriptor; use parquet::{bloom_filter::Sbbf, file::metadata::RowGroupMetaData}; +/// Starting byte offset of a row group in its parquet file. +/// +/// Uses the first column's dictionary page offset when present, otherwise its +/// data page offset — intentionally *not* the metadata location, per +/// . +pub fn row_group_start_offset(metadata: &RowGroupMetaData) -> i64 { + let col = metadata.column(0); + col.dictionary_page_offset() + .unwrap_or_else(|| col.data_page_offset()) +} + /// Reduces the [`ParquetAccessPlan`] based on row group level metadata. /// /// This struct implements the various types of pruning that are applied to a @@ -224,17 +235,7 @@ impl RowGroupAccessPlanFilter { if !self.access_plan.should_scan(idx) { continue; } - - // Skip the row group if the first dictionary/data page are not - // within the range. - // - // note don't use the location of metadata - // - let col = metadata.column(0); - let offset = col - .dictionary_page_offset() - .unwrap_or_else(|| col.data_page_offset()); - if !range.contains(offset) { + if !range.contains(row_group_start_offset(metadata)) { self.access_plan.skip(idx); } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..b08e99656e0ac 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -29,7 +29,7 @@ use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; -use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::file_stream::{FileOpener, SharedWorkSource}; use datafusion_datasource::morsel::Morselizer; use arrow::datatypes::TimeUnit; @@ -526,6 +526,7 @@ impl FileSource for ParquetSource { object_store: Arc, base_config: &FileScanConfig, partition: usize, + shared_work_source: Option, ) -> datafusion_common::Result> { let expr_adapter_factory = base_config .expr_adapter_factory @@ -580,6 +581,7 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + shared_work_source, })) } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 9b4ae5827ae8b..d8ecad3db1a11 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; -use crate::file_stream::FileOpener; +use crate::file_stream::{FileOpener, SharedWorkSource}; use crate::morsel::{FileOpenerMorselizer, Morselizer}; #[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; @@ -82,11 +82,18 @@ pub trait FileSource: Any + Send + Sync { /// /// It is preferred to implement the [`Morselizer`] API directly by /// implementing this method. + /// + /// `shared_work_source`, when `Some`, is the queue of unopened files + /// shared across sibling streams. File sources that can sub-divide a + /// single file into smaller stealable work units (e.g. parquet row-group + /// splitting) may push donated chunks onto it; sources that cannot simply + /// ignore the parameter. fn create_morselizer( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, + _shared_work_source: Option, ) -> Result> { let opener = self.create_file_opener(object_store, base_config, partition)?; Ok(Box::new(FileOpenerMorselizer::new(opener))) diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..aeed3c16670df 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -597,8 +597,6 @@ impl DataSource for FileScanConfig { let source = self.file_source.with_batch_size(batch_size); - let morselizer = source.create_morselizer(object_store, self, partition)?; - // Extract the shared work source from the sibling state if it exists. // This allows multiple sibling streams to steal work from a single // shared queue of unopened files. @@ -607,6 +605,13 @@ impl DataSource for FileScanConfig { .and_then(|state| state.downcast_ref::()) .cloned(); + let morselizer = source.create_morselizer( + object_store, + self, + partition, + shared_work_source.clone(), + )?; + let stream = FileStreamBuilder::new(self) .with_partition(partition) .with_shared_work_source(shared_work_source) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index e277690cff810..596c8a78099cb 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -47,6 +47,7 @@ use self::scan_state::{ScanAndReturn, ScanState}; pub use builder::FileStreamBuilder; pub use metrics::{FileStreamMetrics, StartableTime}; +pub use work_source::SharedWorkSource; /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 7f31dacca9592..66d90076cc4dd 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -64,7 +64,7 @@ impl WorkSource { /// It uses a [`Mutex`] internally to provide thread-safe access /// to the shared file queue. #[derive(Debug, Clone)] -pub(crate) struct SharedWorkSource { +pub struct SharedWorkSource { inner: Arc, } @@ -73,6 +73,12 @@ pub(super) struct SharedWorkSourceInner { files: Mutex>, } +impl Default for SharedWorkSource { + fn default() -> Self { + Self::new(std::iter::empty()) + } +} + impl SharedWorkSource { /// Create a shared work source containing the provided unopened files. pub(crate) fn new(files: impl IntoIterator) -> Self { @@ -91,8 +97,23 @@ impl SharedWorkSource { /// Pop the next file from the shared work queue. /// - /// Returns `None` if the queue is empty - fn pop_front(&self) -> Option { + /// Returns `None` if the queue is empty. + pub fn pop_front(&self) -> Option { self.inner.files.lock().pop_front() } + + /// Push files to the front of the shared work queue. + /// + /// Used when an in-flight file is sub-divided (e.g. into row-group-sized + /// chunks): the donor keeps one chunk and pushes the rest to the front so + /// any idle sibling picks them up before starting work on an unrelated + /// whole file. Items preserve their order: the first element of `items` + /// ends up at the very front of the queue. + pub fn push_front(&self, items: impl IntoIterator) { + let items: Vec = items.into_iter().collect(); + let mut queue = self.inner.files.lock(); + for file in items.into_iter().rev() { + queue.push_front(file); + } + } } From 4ff800f0669b1a72890b1ea70d0206b5a0e4ed8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 21 Apr 2026 20:48:38 +0200 Subject: [PATCH 2/7] refactor(parquet): donor finalizes pruning; stealers go to BuildStream directly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves `SplitAndDonate` from between `LoadMetadata` and `PrepareFilters` to *after* `PruneWithBloomFilters`, and restructures stealer paths so row-group morselization composes with the full pruning pipeline. **Donor path**: - Runs the existing pipeline unchanged: file-level pruning → metadata load → prepare filters → page index → stats pruning → bloom pruning. - `SplitAndDonate` then runs `prune_by_limit` (moved out of `build_stream`) as a separate file-level pass, picks the first surviving row group, and packages each remaining one into a `ParquetOpenChunk` containing the access plan, loaded `ArrowReaderMetadata`, prepared `PruningPredicate`, `PagePruningAccessPlanFilter`, physical schema, and rewritten predicate/projection. **Stealer path**: - `ParquetMorselPlanner::try_new` detects a `ParquetOpenChunk` on the incoming `PartitionedFile` and constructs state directly at `BuildStream` via `build_stealer_state`. No metadata load, no predicate rebuild, no pruning traversal — the stealer just builds its reader against the donor's finalized access plan. **Shared work queue split**: - `SharedWorkSource` now has two queues: `morsels` (pre-prepared chunks with finalized state) and `files` (whole unopened files). `pop_front` drains morsels first so their latency stays low. Donor calls `push_morsels` instead of the old `push_front` convention. **Removed state/guards** (no longer needed with direct-BuildStream entry): - `PreparedParquetOpen::is_donated_chunk` and `preloaded_reader_metadata` fields. - The `is_donated_chunk` short-circuits in `prune_file`, `prepare_open_file`, `load`, `prune_row_groups`, and `split_and_donate`. Limit-pruning tests (`test_limit_pruning_*` in `datafusion/core/tests/parquet/row_group_pruning.rs`) pass — the donor sees the full row-group picture for `prune_by_limit`, stealers inherit the pruned plan. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/datasource-parquet/src/opener.rs | 375 +++++++++++++----- .../datasource/src/file_stream/work_source.rs | 50 ++- 2 files changed, 301 insertions(+), 124 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 4a5de9056ef0a..eb4e10e96f0d0 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -19,9 +19,7 @@ use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::build_projection_read_plan; -use crate::row_group_filter::{ - BloomFilterStatistics, RowGroupAccessPlanFilter, row_group_start_offset, -}; +use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, @@ -183,9 +181,6 @@ impl Morselizer for ParquetMorselizer { /// LoadMetadata /// | /// v -/// SplitAndDonate -/// | -/// v /// PrepareFilters /// | /// v @@ -201,6 +196,9 @@ impl Morselizer for ParquetMorselizer { /// PruneWithBloomFilters /// | /// v +/// SplitAndDonate +/// | +/// v /// BuildStream /// | /// v @@ -224,11 +222,6 @@ enum ParquetOpenState { PruneFile(Box), /// Loading Parquet metadata (in footer) LoadMetadata(BoxFuture<'static, Result>), - /// Optionally donate row-group chunks of this file back to the shared - /// work queue so idle sibling streams can scan them in parallel. CPU-only; - /// the donated chunks carry the already-loaded `ArrowReaderMetadata` so - /// stealers skip the footer round-trip. - SplitAndDonate(Box), /// Specialize any filters for the actual file schema (only known after /// metadata is loaded) PrepareFilters(Box), @@ -240,6 +233,11 @@ enum ParquetOpenState { LoadBloomFilters(BoxFuture<'static, Result>), /// Pruning with preloaded Bloom Filters PruneWithBloomFilters(Box), + /// Apply file-level LIMIT pruning (runs after range/stats/bloom so it + /// sees which row groups are still in scope) and donate the survivors + /// in 1-RG chunks back to the shared work queue. Stealers pop a chunk + /// with a finalized access plan and skip every earlier pruning stage. + SplitAndDonate(Box), /// Builds the final reader stream /// /// TODO: split state as this currently does both I/O and CPU work. @@ -258,12 +256,12 @@ impl fmt::Debug for ParquetOpenState { ParquetOpenState::LoadEncryption(_) => "LoadEncryption", ParquetOpenState::PruneFile(_) => "PruneFile", ParquetOpenState::LoadMetadata(_) => "LoadMetadata", - ParquetOpenState::SplitAndDonate(_) => "SplitAndDonate", ParquetOpenState::PrepareFilters(_) => "PrepareFilters", ParquetOpenState::LoadPageIndex(_) => "LoadPageIndex", ParquetOpenState::PruneWithStatistics(_) => "PruneWithStatistics", ParquetOpenState::LoadBloomFilters(_) => "LoadBloomFilters", ParquetOpenState::PruneWithBloomFilters(_) => "PruneWithBloomFilters", + ParquetOpenState::SplitAndDonate(_) => "SplitAndDonate", ParquetOpenState::BuildStream(_) => "BuildStream", ParquetOpenState::Ready(_) => "Ready", ParquetOpenState::Done => "Done", @@ -307,11 +305,37 @@ struct PreparedParquetOpen { #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, /// Shared work queue used to donate row-group chunks of this file to - /// sibling streams. `None` when the file was already donated (re-donation - /// is intentionally disabled) or when sibling stealing is disabled. + /// sibling streams. `None` when sibling stealing is disabled. shared_work_source: Option, } +/// Extension carried on a donated `PartitionedFile`. +/// +/// Donation happens after the donor has already run every pre-scan stage +/// (file-level pruning, metadata load, filter preparation, page index +/// load, stats / bloom / limit pruning). The chunk packages the full +/// result so the stealer's state machine can start directly at +/// `BuildStream` — no footer round-trip, no pruning, no predicate build. +#[derive(Clone)] +pub(crate) struct ParquetOpenChunk { + pub access_plan: ParquetAccessPlan, + pub reader_metadata: ArrowReaderMetadata, + pub options: ArrowReaderOptions, + pub physical_file_schema: SchemaRef, + pub predicate: Option>, + pub projection: ProjectionExprs, + pub pruning_predicate: Option>, + pub page_pruning_predicate: Option>, +} + +impl fmt::Debug for ParquetOpenChunk { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetOpenChunk") + .field("access_plan", &self.access_plan) + .finish_non_exhaustive() + } +} + /// State of [`ParquetOpenState`] /// /// Result of loading parquet metadata after file-level pruning is complete. @@ -396,10 +420,6 @@ impl ParquetOpenState { ParquetOpenState::LoadMetadata(future) => { Ok(ParquetOpenState::LoadMetadata(future)) } - ParquetOpenState::SplitAndDonate(loaded) => { - let next = loaded.split_and_donate()?; - Ok(ParquetOpenState::PrepareFilters(Box::new(next))) - } ParquetOpenState::PrepareFilters(loaded) => { let prepared_filters = loaded.prepare_filters()?; Ok(ParquetOpenState::LoadPageIndex( @@ -419,8 +439,12 @@ impl ParquetOpenState { Ok(ParquetOpenState::LoadBloomFilters(future)) } ParquetOpenState::PruneWithBloomFilters(loaded) => Ok( - ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())), + ParquetOpenState::SplitAndDonate(Box::new(loaded.prune_bloom_filters())), ), + ParquetOpenState::SplitAndDonate(prepared) => { + let next = prepared.split_and_donate()?; + Ok(ParquetOpenState::BuildStream(Box::new(next))) + } ParquetOpenState::BuildStream(prepared) => { Ok(ParquetOpenState::Ready(prepared.build_stream()?)) } @@ -472,6 +496,20 @@ impl fmt::Debug for ParquetMorselPlanner { impl ParquetMorselPlanner { fn try_new(morselizer: &ParquetMorselizer, file: PartitionedFile) -> Result { + // A donated chunk carries the full handoff state: jump straight + // to `BuildStream` and skip every pruning stage. + if let Some(chunk) = file + .extensions + .as_ref() + .and_then(|ext| ext.downcast_ref::()) + .cloned() + { + let built = morselizer.build_stealer_state(file, chunk)?; + return Ok(Self { + state: ParquetOpenState::BuildStream(Box::new(built)), + }); + } + let prepared = morselizer.prepare_open_file(file)?; #[cfg(feature = "parquet_encryption")] let state = ParquetOpenState::Start { @@ -523,7 +561,7 @@ impl MorselPlanner for ParquetMorselPlanner { } ParquetOpenState::LoadMetadata(future) => { Ok(Some(Self::schedule_io(async move { - Ok(ParquetOpenState::SplitAndDonate(Box::new(future.await?))) + Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) }))) } ParquetOpenState::LoadPageIndex(future) => { @@ -687,6 +725,101 @@ impl ParquetMorselizer { shared_work_source, }) } + + /// Construct the state for a donated row-group chunk: the stealer + /// starts directly at [`ParquetOpenState::BuildStream`]. We skip the + /// entire pre-scan pipeline (file-level pruning, metadata load, + /// filter preparation, page-index load, stats / bloom / limit + /// pruning) by reusing the donor's already-computed state carried + /// on the `PartitionedFile`. + fn build_stealer_state( + &self, + partitioned_file: PartitionedFile, + chunk: ParquetOpenChunk, + ) -> Result { + let file_name = partitioned_file.object_meta.location.to_string(); + let file_metrics = + ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); + let baseline_metrics = BaselineMetrics::new(&self.metrics, self.partition_index); + let metadata_size_hint = partitioned_file + .metadata_size_hint + .or(self.metadata_size_hint); + let async_file_reader: Box = + self.parquet_file_reader_factory.create_reader( + self.partition_index, + partitioned_file.clone(), + metadata_size_hint, + &self.metrics, + )?; + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Rows) + .global_counter("num_predicate_creation_errors"); + let logical_file_schema = Arc::clone(self.table_schema.file_schema()); + let output_schema = Arc::new( + self.projection + .project_schema(self.table_schema.table_schema())?, + ); + + let ParquetOpenChunk { + access_plan, + reader_metadata, + options, + physical_file_schema, + predicate, + projection, + pruning_predicate, + page_pruning_predicate, + } = chunk; + + let prepared = PreparedParquetOpen { + partition_index: self.partition_index, + partitioned_file, + file_range: None, + extensions: None, + file_name, + file_metrics, + baseline_metrics, + file_pruner: None, + metadata_size_hint, + metrics: self.metrics.clone(), + parquet_file_reader_factory: Arc::clone(&self.parquet_file_reader_factory), + async_file_reader, + batch_size: self.batch_size, + logical_file_schema, + physical_file_schema, + output_schema, + projection, + predicate, + reorder_predicates: self.reorder_filters, + pushdown_filters: self.pushdown_filters, + force_filter_selections: self.force_filter_selections, + enable_page_index: self.enable_page_index, + enable_bloom_filter: self.enable_bloom_filter, + enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, + limit: self.limit, + coerce_int96: self.coerce_int96, + expr_adapter_factory: Arc::clone(&self.expr_adapter_factory), + predicate_creation_errors, + max_predicate_cache_size: self.max_predicate_cache_size, + reverse_row_groups: self.reverse_row_groups, + preserve_order: self.preserve_order, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + shared_work_source: None, + }; + Ok(RowGroupsPrunedParquetOpen { + prepared: FiltersPreparedParquetOpen { + loaded: MetadataLoadedParquetOpen { + prepared, + reader_metadata, + options, + }, + pruning_predicate, + page_pruning_predicate, + }, + row_groups: RowGroupAccessPlanFilter::new(access_plan), + }) + } } impl PreparedParquetOpen { @@ -770,75 +903,6 @@ impl PreparedParquetOpen { } impl MetadataLoadedParquetOpen { - /// Donate row-group chunks of this file back to the shared work queue - /// so idle sibling FileStreams steal them. - /// - /// The donor keeps the first eligible row group; each remaining one is - /// pushed to the front of the shared queue as a `PartitionedFile` clone - /// whose `range` is a one-byte `FileRange` at the row group's starting - /// offset. The existing `prune_by_range` path on the stealer matches - /// that offset and keeps only the targeted row group. Stealers re-load - /// the parquet footer; object stores typically cache it so this is - /// cheap. - /// - /// When the caller pre-narrowed the scan with a `file_range` spanning - /// multiple row groups, splitting stays inside that range. - /// - /// Returns `self` unchanged if any guard fails (no shared queue, - /// caller-supplied access plan, or fewer than two row groups in scope). - fn split_and_donate(mut self) -> Result { - let Some(shared) = self.prepared.shared_work_source.take() else { - return Ok(self); - }; - // Caller-supplied `ParquetAccessPlan` is respected as-is. - if let Some(ext) = self.prepared.extensions.as_ref() - && ext.is::() - { - return Ok(self); - } - - let rgs = self.reader_metadata.metadata().row_groups(); - if rgs.len() < 2 { - return Ok(self); - } - - // One-byte `FileRange` at `start` selects exactly one row group via - // `prune_by_range`'s `contains(offset)` check. - let point_range = |start: i64| datafusion_datasource::FileRange { - start, - end: start + 1, - }; - - // Row groups in scope, paired with their starting offset so we only - // walk the metadata once. - let caller_range = self.prepared.file_range.as_ref(); - let eligible: Vec<(usize, i64)> = rgs - .iter() - .enumerate() - .map(|(idx, md)| (idx, row_group_start_offset(md))) - .filter(|(_, offset)| caller_range.is_none_or(|r| r.contains(*offset))) - .collect(); - if eligible.len() < 2 { - return Ok(self); - } - - let (_keep_idx, keep_offset) = eligible[0]; - let donated_files: Vec = eligible[1..] - .iter() - .map(|&(_, offset)| { - let mut file = self.prepared.partitioned_file.clone(); - file.range = Some(point_range(offset)); - file - }) - .collect(); - shared.push_front(donated_files); - - // Donor takes only the kept row group. Safe to override the caller's - // wider range: donated chunks cover every other in-scope row group. - self.prepared.file_range = Some(point_range(keep_offset)); - Ok(self) - } - /// Prepare file-schema coercions and pruning predicates once metadata is /// loaded. fn prepare_filters(self) -> Result { @@ -1151,6 +1215,102 @@ impl BloomFiltersLoadedParquetOpen { } impl RowGroupsPrunedParquetOpen { + /// File-level LIMIT pruning + row-group donation. + /// + /// Runs after stats + bloom pruning so the access plan reflects every + /// pruning decision before `prune_by_limit` picks fully-matched row + /// groups (which requires the whole-file view). Once limit pruning is + /// done, the donor keeps the first surviving row group and pushes + /// each remaining one to the front of the shared queue as a + /// `PartitionedFile` clone whose `extensions` carry a + /// `ParquetOpenChunk` (finalized access plan + pre-loaded metadata). + /// Stealers pop a chunk and skip every pruning stage on the way to + /// `BuildStream`. + /// + /// No-op for stealers (`is_donated_chunk`) and when there are fewer + /// than two row groups left in scope. + fn split_and_donate(mut self) -> Result { + // File-level LIMIT pruning — a separate pass that needs the + // whole file's row-group picture. Only the donor reaches this + // code (stealers start at `BuildStream` directly). + if let (Some(limit), false) = ( + self.prepared.loaded.prepared.limit, + self.prepared.loaded.prepared.preserve_order, + ) { + let rg_metadata = self + .prepared + .loaded + .reader_metadata + .metadata() + .row_groups() + .to_vec(); + self.row_groups.prune_by_limit( + limit, + &rg_metadata, + &self.prepared.loaded.prepared.file_metrics, + ); + } + + let Some(shared) = self.prepared.loaded.prepared.shared_work_source.take() else { + return Ok(self); + }; + // Respect a caller-supplied `ParquetAccessPlan`. + if let Some(ext) = self.prepared.loaded.prepared.extensions.as_ref() + && ext.is::() + { + return Ok(self); + } + + let eligible: Vec = self.row_groups.row_group_indexes().collect(); + if eligible.len() < 2 { + return Ok(self); + } + + let num_rgs = self + .prepared + .loaded + .reader_metadata + .metadata() + .num_row_groups(); + let single_rg_plan = |idx: usize| -> ParquetAccessPlan { + let mut plan = ParquetAccessPlan::new_none(num_rgs); + plan.scan(idx); + plan + }; + + // Bundle everything the stealer needs to jump straight to + // `BuildStream`: metadata, access plan, reader options, and the + // already-built predicates / projection / schema. + let make_chunk = |idx: usize| ParquetOpenChunk { + access_plan: single_rg_plan(idx), + reader_metadata: self.prepared.loaded.reader_metadata.clone(), + options: self.prepared.loaded.options.clone(), + physical_file_schema: Arc::clone( + &self.prepared.loaded.prepared.physical_file_schema, + ), + predicate: self.prepared.loaded.prepared.predicate.clone(), + projection: self.prepared.loaded.prepared.projection.clone(), + pruning_predicate: self.prepared.pruning_predicate.clone(), + page_pruning_predicate: self.prepared.page_pruning_predicate.clone(), + }; + + let keep_idx = eligible[0]; + let donated_files: Vec = eligible[1..] + .iter() + .map(|&idx| { + let mut file = self.prepared.loaded.prepared.partitioned_file.clone(); + file.range = None; + file.extensions = Some(Arc::new(make_chunk(idx))); + file + }) + .collect(); + shared.push_morsels(donated_files); + + // Narrow the donor's access plan to just the kept row group. + self.row_groups = RowGroupAccessPlanFilter::new(single_rg_plan(keep_idx)); + Ok(self) + } + /// Build the final parquet stream once all pruning work is complete. fn build_stream(self) -> Result>> { let RowGroupsPrunedParquetOpen { @@ -1640,15 +1800,21 @@ fn create_initial_plan( row_group_count: usize, ) -> Result { if let Some(extensions) = extensions { - if let Some(access_plan) = extensions.downcast_ref::() { + let access_plan = + if let Some(plan) = extensions.downcast_ref::() { + Some(plan) + } else { + extensions + .downcast_ref::() + .map(|c| &c.access_plan) + }; + if let Some(access_plan) = access_plan { let plan_len = access_plan.len(); if plan_len != row_group_count { return exec_err!( "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}" ); } - - // check row group count matches the plan return Ok(access_plan.clone()); } else { debug!("DataSourceExec Ignoring unknown extension specified for {file_name}"); @@ -1720,6 +1886,7 @@ async fn load_page_index( mod test { use super::*; use super::{ConstantColumns, ParquetMorselizer, constant_columns_from_stats}; + use crate::row_group_filter::row_group_start_offset; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess}; use arrow::array::RecordBatch; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -2895,12 +3062,15 @@ mod test { let mut stolen: Vec> = Vec::new(); while let Some(donated) = shared.pop_front() { assert!( - donated.range.is_some(), - "donated chunk must carry a FileRange" + donated.range.is_none(), + "donated chunk should not use byte range — the access plan specifies the row group" ); assert!( - donated.extensions.is_none(), - "donated chunk must not carry an extensions payload" + donated + .extensions + .as_ref() + .is_some_and(|ext| ext.is::()), + "donated chunk must carry ParquetOpenChunk" ); let stealer = split_test_morselizer(&store, &schema, None); let stream = open_file(&stealer, donated).await.unwrap(); @@ -2990,20 +3160,19 @@ mod test { let mut donated_count = 0; let mut all_stolen: Vec = Vec::new(); while let Some(donated) = shared.pop_front() { - let donated_range = donated - .range - .clone() - .expect("donated chunk needs FileRange"); assert!( - donated_range.start >= caller_range.start - && donated_range.end <= caller_range.end, - "donated range must stay inside caller's range" + donated + .extensions + .as_ref() + .is_some_and(|ext| ext.is::()), + "donated chunk must carry ParquetOpenChunk" ); donated_count += 1; let stealer = split_test_morselizer(&store, &schema, None); let stream = open_file(&stealer, donated).await.unwrap(); all_stolen.extend(collect_int32_values(stream).await); } + let _ = caller_range; assert_eq!(donated_count, 3); assert_eq!(all_stolen, vec![4, 5, 6, 7, 8, 9, 10, 11, 12]); } diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 66d90076cc4dd..41ebdbb6efa80 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -55,14 +55,20 @@ impl WorkSource { } } -/// Shared source of work for sibling `FileStream`s +/// Shared source of work for sibling `FileStream`s. /// -/// The queue is created once per execution and shared by all reorderable -/// sibling streams for that execution. Whichever stream becomes idle first may -/// take the next unopened file from the front of the queue. +/// Created once per execution and shared by all reorderable sibling streams. +/// Holds two queues: /// -/// It uses a [`Mutex`] internally to provide thread-safe access -/// to the shared file queue. +/// - **morsels**: pre-prepared sub-file work items (e.g. parquet row-group +/// chunks donated mid-open by a sibling). Always popped first. +/// - **files**: whole unopened files — the initial scan units. +/// +/// A FileStream that picks up a morsel has finalized state attached to it +/// (via `PartitionedFile::extensions`) and can skip most of the per-file +/// state machine. Draining morsels first keeps their latency low and +/// prevents siblings from starting fresh whole files while half-processed +/// sub-file work sits idle. #[derive(Debug, Clone)] pub struct SharedWorkSource { inner: Arc, @@ -70,6 +76,7 @@ pub struct SharedWorkSource { #[derive(Debug, Default)] pub(super) struct SharedWorkSourceInner { + morsels: Mutex>, files: Mutex>, } @@ -82,9 +89,10 @@ impl Default for SharedWorkSource { impl SharedWorkSource { /// Create a shared work source containing the provided unopened files. pub(crate) fn new(files: impl IntoIterator) -> Self { - let files = files.into_iter().collect(); + let files: VecDeque = files.into_iter().collect(); Self { inner: Arc::new(SharedWorkSourceInner { + morsels: Mutex::new(VecDeque::new()), files: Mutex::new(files), }), } @@ -95,25 +103,25 @@ impl SharedWorkSource { Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned()) } - /// Pop the next file from the shared work queue. + /// Pop the next item of work — morsels (pre-prepared sub-file chunks) + /// first, then whole files. /// - /// Returns `None` if the queue is empty. + /// Returns `None` if both queues are empty. pub fn pop_front(&self) -> Option { + if let Some(morsel) = self.inner.morsels.lock().pop_front() { + return Some(morsel); + } self.inner.files.lock().pop_front() } - /// Push files to the front of the shared work queue. + /// Push pre-prepared morsels onto the morsel queue. /// - /// Used when an in-flight file is sub-divided (e.g. into row-group-sized - /// chunks): the donor keeps one chunk and pushes the rest to the front so - /// any idle sibling picks them up before starting work on an unrelated - /// whole file. Items preserve their order: the first element of `items` - /// ends up at the very front of the queue. - pub fn push_front(&self, items: impl IntoIterator) { - let items: Vec = items.into_iter().collect(); - let mut queue = self.inner.files.lock(); - for file in items.into_iter().rev() { - queue.push_front(file); - } + /// Used when an in-flight file is sub-divided (e.g. parquet row-group + /// splitting): each donated chunk carries its finalized state via + /// `PartitionedFile::extensions` so the stealer can skip most of the + /// per-file state machine. Items preserve their order. + pub fn push_morsels(&self, items: impl IntoIterator) { + let mut queue = self.inner.morsels.lock(); + queue.extend(items); } } From 16967cd5fca63b05a3dd5785583279f877849fd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 21 Apr 2026 20:58:50 +0200 Subject: [PATCH 3/7] cleanup: trim dead code and redundant comments from row-group split path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Inline the `caller_range` construction in `row_group_split_within_caller_file_range` test and drop the vestigial `let _ = caller_range;` binding left over from the earlier file-range-based donation mechanism. - Update `split_and_donate` docstring: the stale `is_donated_chunk` reference predates the direct-to-BuildStream entry path. Stealers now never reach this function. - Drop `rg_metadata.to_vec()` in the LIMIT pruning pass — `prune_by_limit` takes `&[RowGroupMetaData]`, so the slice is enough and we save one allocation per limit-pruned file. - Delete two "what-not-why" narrating comments from the donation path ("Bundle everything the stealer needs..." and "Narrow the donor's access plan...") — the code is self-explanatory. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/datasource-parquet/src/opener.rs | 40 +++++++-------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index eb4e10e96f0d0..ea9b5434280a2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1224,29 +1224,23 @@ impl RowGroupsPrunedParquetOpen { /// each remaining one to the front of the shared queue as a /// `PartitionedFile` clone whose `extensions` carry a /// `ParquetOpenChunk` (finalized access plan + pre-loaded metadata). - /// Stealers pop a chunk and skip every pruning stage on the way to - /// `BuildStream`. + /// Stealers pop a chunk and start at `BuildStream` directly (see + /// [`ParquetMorselizer::build_stealer_state`]); they never reach + /// this function. /// - /// No-op for stealers (`is_donated_chunk`) and when there are fewer - /// than two row groups left in scope. + /// No-op when there are fewer than two row groups left in scope, no + /// shared queue is attached, or the caller supplied their own + /// `ParquetAccessPlan`. fn split_and_donate(mut self) -> Result { - // File-level LIMIT pruning — a separate pass that needs the - // whole file's row-group picture. Only the donor reaches this - // code (stealers start at `BuildStream` directly). if let (Some(limit), false) = ( self.prepared.loaded.prepared.limit, self.prepared.loaded.prepared.preserve_order, ) { - let rg_metadata = self - .prepared - .loaded - .reader_metadata - .metadata() - .row_groups() - .to_vec(); + let rg_metadata = + self.prepared.loaded.reader_metadata.metadata().row_groups(); self.row_groups.prune_by_limit( limit, - &rg_metadata, + rg_metadata, &self.prepared.loaded.prepared.file_metrics, ); } @@ -1254,7 +1248,6 @@ impl RowGroupsPrunedParquetOpen { let Some(shared) = self.prepared.loaded.prepared.shared_work_source.take() else { return Ok(self); }; - // Respect a caller-supplied `ParquetAccessPlan`. if let Some(ext) = self.prepared.loaded.prepared.extensions.as_ref() && ext.is::() { @@ -1278,9 +1271,6 @@ impl RowGroupsPrunedParquetOpen { plan }; - // Bundle everything the stealer needs to jump straight to - // `BuildStream`: metadata, access plan, reader options, and the - // already-built predicates / projection / schema. let make_chunk = |idx: usize| ParquetOpenChunk { access_plan: single_rg_plan(idx), reader_metadata: self.prepared.loaded.reader_metadata.clone(), @@ -1305,8 +1295,6 @@ impl RowGroupsPrunedParquetOpen { }) .collect(); shared.push_morsels(donated_files); - - // Narrow the donor's access plan to just the kept row group. self.row_groups = RowGroupAccessPlanFilter::new(single_rg_plan(keep_idx)); Ok(self) } @@ -3141,12 +3129,11 @@ mod test { #[tokio::test] async fn row_group_split_within_caller_file_range() { let (store, schema, file, data_len) = write_four_row_group_file().await; - let caller_range = datafusion_datasource::FileRange { - start: 0, - end: data_len as i64, - }; let file = PartitionedFile { - range: Some(caller_range.clone()), + range: Some(datafusion_datasource::FileRange { + start: 0, + end: data_len as i64, + }), ..file }; let shared = SharedWorkSource::default(); @@ -3172,7 +3159,6 @@ mod test { let stream = open_file(&stealer, donated).await.unwrap(); all_stolen.extend(collect_int32_values(stream).await); } - let _ = caller_range; assert_eq!(donated_count, 3); assert_eq!(all_stolen, vec![4, 5, 6, 7, 8, 9, 10, 11, 12]); } From 641e6ccdf31a9b163d05006c0c4cfa6f1cc4b13a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 21 Apr 2026 22:02:25 +0200 Subject: [PATCH 4/7] perf(parquet): build row-filter candidates once per file, share to stealers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `build_row_filter` does two things at very different costs: 1. Walk the predicate, split conjuncts, build a `FilterCandidate` per conjunct (resolves ProjectionMask + projected schema + required-bytes estimate), and optionally reorder by cost. This is schema/metadata work that is identical for every open of the same file. 2. Bind each candidate to the current open's metrics counters. Before this change, both ran per open — so a 226-RG file split into 226 chunks paid the analysis cost 226×. After this change, the donor (or an un-split file open) builds the `Vec` once in `prepare_filters`; donated chunks carry it through `ParquetOpenChunk`; each `build_stream` does only the cheap metric binding via the new `row_filter_from_candidates`. Refactor: - Split `row_filter::build_row_filter` into `build_row_filter_candidates` (expensive, metrics-free) and `row_filter_from_candidates` (cheap, per-open). `build_row_filter` becomes a thin wrapper. - `FilterCandidate` now `Clone`. - `FiltersPreparedParquetOpen` gains `row_filter_candidates: Option>>`, built in `prepare_filters` from the donor's rewritten predicate. - `ParquetOpenChunk` carries the same `Arc` across the handoff so stealers reuse it in `build_stream`. - `build_stream` now calls `row_filter_from_candidates` on the cached vec instead of re-running the full builder. Correctness: each open still gets its own metric bindings — only the candidate analysis is shared. Existing tests pass (103 lib + 200 integration). Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/datasource-parquet/src/opener.rs | 69 ++++++++++++++----- .../datasource-parquet/src/row_filter.rs | 53 ++++++++++---- 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index ea9b5434280a2..d0a9ad5a5abec 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -18,11 +18,14 @@ //! [`ParquetMorselizer`] state machines for opening Parquet files use crate::page_filter::PagePruningAccessPlanFilter; -use crate::row_filter::build_projection_read_plan; +use crate::row_filter::{ + FilterCandidate, build_projection_read_plan, build_row_filter_candidates, + row_filter_from_candidates, +}; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, - apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, + apply_file_schema_type_coercions, coerce_int96_to_resolution, }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; @@ -326,6 +329,7 @@ pub(crate) struct ParquetOpenChunk { pub projection: ProjectionExprs, pub pruning_predicate: Option>, pub page_pruning_predicate: Option>, + pub row_filter_candidates: Option>>, } impl fmt::Debug for ParquetOpenChunk { @@ -353,6 +357,11 @@ struct FiltersPreparedParquetOpen { loaded: MetadataLoadedParquetOpen, pruning_predicate: Option>, page_pruning_predicate: Option>, + /// Row-filter conjunct candidates built once here and shared with + /// sibling stealers via `ParquetOpenChunk` so each open doesn't + /// redo the schema-walk + ProjectionMask build. Only the cheap + /// per-open metric binding happens at `build_stream` time. + row_filter_candidates: Option>>, } /// State of [`ParquetOpenState`] @@ -769,6 +778,7 @@ impl ParquetMorselizer { projection, pruning_predicate, page_pruning_predicate, + row_filter_candidates, } = chunk; let prepared = PreparedParquetOpen { @@ -816,6 +826,7 @@ impl ParquetMorselizer { }, pruning_predicate, page_pruning_predicate, + row_filter_candidates, }, row_groups: RowGroupAccessPlanFilter::new(access_plan), }) @@ -999,6 +1010,32 @@ impl MetadataLoadedParquetOpen { None }; + // Build row-filter candidates once here — the expensive part of + // pushdown-filter construction (schema walk, ProjectionMask + // build, cost-based reorder). Stealers of donated chunks reuse + // this via `ParquetOpenChunk`. + let row_filter_candidates = if let Some(predicate) = prepared + .pushdown_filters + .then_some(prepared.predicate.as_ref()) + .flatten() + { + match build_row_filter_candidates( + predicate, + &physical_file_schema, + reader_metadata.metadata(), + prepared.reorder_predicates, + ) { + Ok(Some(cs)) => Some(Arc::new(cs)), + Ok(None) => None, + Err(e) => { + debug!("Ignoring error building row filter for '{predicate:?}': {e}"); + None + } + } + } else { + None + }; + Ok(FiltersPreparedParquetOpen { loaded: MetadataLoadedParquetOpen { prepared, @@ -1007,6 +1044,7 @@ impl MetadataLoadedParquetOpen { }, pruning_predicate, page_pruning_predicate, + row_filter_candidates, }) } } @@ -1282,6 +1320,7 @@ impl RowGroupsPrunedParquetOpen { projection: self.prepared.loaded.prepared.projection.clone(), pruning_predicate: self.prepared.pruning_predicate.clone(), page_pruning_predicate: self.prepared.page_pruning_predicate.clone(), + row_filter_candidates: self.prepared.row_filter_candidates.clone(), }; let keep_idx = eligible[0]; @@ -1309,6 +1348,7 @@ impl RowGroupsPrunedParquetOpen { loaded, pruning_predicate: _, page_pruning_predicate, + row_filter_candidates, } = prepared; let MetadataLoadedParquetOpen { prepared, @@ -1319,25 +1359,16 @@ impl RowGroupsPrunedParquetOpen { let file_metadata = Arc::clone(reader_metadata.metadata()); let rg_metadata = file_metadata.row_groups(); - // Filter pushdown: evaluate predicates during scan - let row_filter = if let Some(predicate) = prepared - .pushdown_filters - .then_some(prepared.predicate.clone()) - .flatten() + // Filter pushdown: evaluate predicates during scan. The + // expensive candidate construction ran once in `prepare_filters`; + // here we only do the cheap per-open metric binding. + let row_filter = if let Some(candidates) = row_filter_candidates.as_deref() + && prepared.pushdown_filters { - let row_filter = row_filter::build_row_filter( - &predicate, - &prepared.physical_file_schema, - file_metadata.as_ref(), - prepared.reorder_predicates, - &prepared.file_metrics, - ); - - match row_filter { - Ok(Some(filter)) => Some(filter), - Ok(None) => None, + match row_filter_from_candidates(candidates, &prepared.file_metrics) { + Ok(filter) => Some(filter), Err(e) => { - debug!("Ignoring error building row filter for '{predicate:?}': {e}"); + debug!("Ignoring error building row filter: {e}"); None } } diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index c5c372055826b..0478269db1bec 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -177,6 +177,7 @@ impl ArrowPredicate for DatafusionArrowPredicate { /// of evaluating the resulting expression. /// /// See the module level documentation for more information. +#[derive(Clone)] pub(crate) struct FilterCandidate { expr: Arc, /// Estimate for the total number of bytes that will need to be processed @@ -1019,10 +1020,28 @@ pub fn build_row_filter( reorder_predicates: bool, file_metrics: &ParquetFileMetrics, ) -> Result> { - let rows_pruned = &file_metrics.pushdown_rows_pruned; - let rows_matched = &file_metrics.pushdown_rows_matched; - let time = &file_metrics.row_pushdown_eval_time; + let Some(candidates) = + build_row_filter_candidates(expr, file_schema, metadata, reorder_predicates)? + else { + return Ok(None); + }; + row_filter_from_candidates(&candidates, file_metrics).map(Some) +} +/// Expensive, metrics-free first phase of row-filter construction. +/// +/// Splits the predicate into conjuncts, resolves each one against the file +/// schema and parquet metadata (building [`ProjectionMask`]s and cost +/// estimates), and optionally reorders candidates by estimated cost. The +/// result is the same for every open of a given (predicate, file_schema, +/// file_metadata) triple, so a donor may build it once and share it with +/// sibling stealers. +pub(crate) fn build_row_filter_candidates( + expr: &Arc, + file_schema: &SchemaRef, + metadata: &ParquetMetaData, + reorder_predicates: bool, +) -> Result>> { // Split into conjuncts: // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); @@ -1039,7 +1058,6 @@ pub fn build_row_filter( .flatten() .collect(); - // no candidates if candidates.is_empty() { return Ok(None); } @@ -1047,6 +1065,22 @@ pub fn build_row_filter( if reorder_predicates { candidates.sort_unstable_by_key(|c| c.required_bytes); } + Ok(Some(candidates)) +} + +/// Cheap per-open second phase: wire each candidate up with the current +/// file's row-filter metrics. +/// +/// Called separately from [`build_row_filter_candidates`] so that the +/// expensive candidate construction can be shared across sibling opens +/// of the same file. +pub(crate) fn row_filter_from_candidates( + candidates: &[FilterCandidate], + file_metrics: &ParquetFileMetrics, +) -> Result { + let rows_pruned = &file_metrics.pushdown_rows_pruned; + let rows_matched = &file_metrics.pushdown_rows_matched; + let time = &file_metrics.row_pushdown_eval_time; // To avoid double-counting metrics when multiple predicates are used: // - All predicates should count rows_pruned (cumulative pruned rows) @@ -1055,23 +1089,18 @@ pub fn build_row_filter( let total_candidates = candidates.len(); candidates - .into_iter() + .iter() .enumerate() .map(|(idx, candidate)| { let is_last = idx == total_candidates - 1; - - // All predicates share the pruned counter (cumulative) let predicate_rows_pruned = rows_pruned.clone(); - - // Only the last predicate tracks matched rows (final result) let predicate_rows_matched = if is_last { rows_matched.clone() } else { metrics::Count::new() }; - DatafusionArrowPredicate::try_new( - candidate, + candidate.clone(), predicate_rows_pruned, predicate_rows_matched, time.clone(), @@ -1079,7 +1108,7 @@ pub fn build_row_filter( .map(|pred| Box::new(pred) as _) }) .collect::, _>>() - .map(|filters| Some(RowFilter::new(filters))) + .map(RowFilter::new) } #[cfg(test)] From 88d6ed8bceb8ff74a8701d8290ce238e72ac666c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 21 Apr 2026 22:33:22 +0200 Subject: [PATCH 5/7] perf(parquet): cache projection read plan + output schema once per scan Two more pieces of per-open CPU work eliminated: 1. **`build_projection_read_plan`** walks the projection expressions, resolves each against the file schema, and builds a `ProjectionMask` + projected `Schema`. Deterministic for a given (projection, physical_file_schema, parquet_schema) triple. Build once in `prepare_filters`, store on `FiltersPreparedParquetOpen`, share with sibling stealers via `ParquetOpenChunk`. `build_stream` now uses the cached `read_plan` instead of re-running the builder. 2. **`output_schema`** (`self.projection.project_schema(self.table_schema)`) was computed twice per file open: once in `prepare_open_file`, once in `build_stealer_state`. Compute it once at `ParquetMorselizer` construction (in `ParquetSource::create_morselizer`) and `Arc::clone` from there. Together with the earlier row-filter-candidate sharing, stealers' `build_stream` now only does: metric binding (cheap), page-index pruning on 1 RG (cheap), access-plan prepare, decoder builder wiring, and the reader setup. No more redundant schema walks. Tests pass (103 lib + 200 integration); clippy clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/datasource-parquet/src/opener.rs | 53 +++++++++++++-------- datafusion/datasource-parquet/src/source.rs | 5 ++ 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index d0a9ad5a5abec..f6c95fa1e3932 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -19,8 +19,8 @@ use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::{ - FilterCandidate, build_projection_read_plan, build_row_filter_candidates, - row_filter_from_candidates, + FilterCandidate, ParquetReadPlan, build_projection_read_plan, + build_row_filter_candidates, row_filter_from_candidates, }; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ @@ -145,6 +145,9 @@ pub(super) struct ParquetMorselizer { /// and donate all-but-one back onto this queue so idle siblings steal /// them instead of sitting idle behind a single hot file. pub shared_work_source: Option, + /// Output schema (table schema with projection applied). Computed + /// once at morselizer construction so every file open reuses it. + pub output_schema: SchemaRef, } impl fmt::Debug for ParquetMorselizer { @@ -330,6 +333,7 @@ pub(crate) struct ParquetOpenChunk { pub pruning_predicate: Option>, pub page_pruning_predicate: Option>, pub row_filter_candidates: Option>>, + pub read_plan: Arc, } impl fmt::Debug for ParquetOpenChunk { @@ -362,6 +366,11 @@ struct FiltersPreparedParquetOpen { /// redo the schema-walk + ProjectionMask build. Only the cheap /// per-open metric binding happens at `build_stream` time. row_filter_candidates: Option>>, + /// Projection read plan (ProjectionMask + projected schema), + /// deterministic for a given (projection, physical_file_schema, + /// parquet_schema) triple. Built once here and reused in + /// `build_stream` so stealers don't redo the schema walk. + read_plan: Arc, } /// State of [`ParquetOpenState`] @@ -627,13 +636,8 @@ impl ParquetMorselizer { &self.metrics, )?; - // Calculate the output schema from the original projection (before literal replacement) - // so we get correct field names from column references let logical_file_schema = Arc::clone(self.table_schema.file_schema()); - let output_schema = Arc::new( - self.projection - .project_schema(self.table_schema.table_schema())?, - ); + let output_schema = Arc::clone(&self.output_schema); // Build a combined map for replacing column references with literal values. // This includes: @@ -764,10 +768,7 @@ impl ParquetMorselizer { .with_category(MetricCategory::Rows) .global_counter("num_predicate_creation_errors"); let logical_file_schema = Arc::clone(self.table_schema.file_schema()); - let output_schema = Arc::new( - self.projection - .project_schema(self.table_schema.table_schema())?, - ); + let output_schema = Arc::clone(&self.output_schema); let ParquetOpenChunk { access_plan, @@ -779,6 +780,7 @@ impl ParquetMorselizer { pruning_predicate, page_pruning_predicate, row_filter_candidates, + read_plan, } = chunk; let prepared = PreparedParquetOpen { @@ -827,6 +829,7 @@ impl ParquetMorselizer { pruning_predicate, page_pruning_predicate, row_filter_candidates, + read_plan, }, row_groups: RowGroupAccessPlanFilter::new(access_plan), }) @@ -1036,6 +1039,14 @@ impl MetadataLoadedParquetOpen { None }; + // Projection read plan (ProjectionMask + projected schema) also + // shareable across opens. + let read_plan = Arc::new(build_projection_read_plan( + prepared.projection.expr_iter(), + &physical_file_schema, + reader_metadata.parquet_schema(), + )); + Ok(FiltersPreparedParquetOpen { loaded: MetadataLoadedParquetOpen { prepared, @@ -1045,6 +1056,7 @@ impl MetadataLoadedParquetOpen { pruning_predicate, page_pruning_predicate, row_filter_candidates, + read_plan, }) } } @@ -1321,6 +1333,7 @@ impl RowGroupsPrunedParquetOpen { pruning_predicate: self.prepared.pruning_predicate.clone(), page_pruning_predicate: self.prepared.page_pruning_predicate.clone(), row_filter_candidates: self.prepared.row_filter_candidates.clone(), + read_plan: Arc::clone(&self.prepared.read_plan), }; let keep_idx = eligible[0]; @@ -1349,6 +1362,7 @@ impl RowGroupsPrunedParquetOpen { pruning_predicate: _, page_pruning_predicate, row_filter_candidates, + read_plan, } = prepared; let MetadataLoadedParquetOpen { prepared, @@ -1408,15 +1422,10 @@ impl RowGroupsPrunedParquetOpen { } let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - let read_plan = build_projection_read_plan( - prepared.projection.expr_iter(), - &prepared.physical_file_schema, - reader_metadata.parquet_schema(), - ); let mut decoder_builder = ParquetPushDecoderBuilder::new_with_metadata(reader_metadata) - .with_projection(read_plan.projection_mask) + .with_projection(read_plan.projection_mask.clone()) .with_batch_size(prepared.batch_size) .with_metrics(arrow_reader_metrics.clone()); @@ -1449,7 +1458,7 @@ impl RowGroupsPrunedParquetOpen { // Check if we need to replace the schema to handle things like differing nullability or metadata. // See note below about file vs. output schema. - let stream_schema = read_plan.projected_schema; + let stream_schema = Arc::clone(&read_plan.projected_schema); let replace_schema = stream_schema != prepared.output_schema; // Rebase column indices to match the narrowed stream schema. @@ -2080,6 +2089,11 @@ mod test { ProjectionExprs::from_indices(&all_indices, &file_schema) }; + let output_schema = Arc::new( + projection + .project_schema(table_schema.table_schema()) + .expect("project_schema"), + ); ParquetMorselizer { partition_index: self.partition_index, projection, @@ -2108,6 +2122,7 @@ mod test { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, shared_work_source: self.shared_work_source, + output_schema, } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index b08e99656e0ac..2a9332df8b644 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -554,6 +554,10 @@ impl FileSource for ParquetSource { .as_ref() .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); + let output_schema = Arc::new( + self.projection + .project_schema(self.table_schema.table_schema())?, + ); Ok(Box::new(ParquetMorselizer { partition_index: partition, projection: self.projection.clone(), @@ -582,6 +586,7 @@ impl FileSource for ParquetSource { max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, shared_work_source, + output_schema, })) } From 26f09e43363279eb519d74d106f4ca6ed13b7dbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 21 Apr 2026 22:39:13 +0200 Subject: [PATCH 6/7] chore: drop narrating comment above read_plan cache build The field docstring on `FiltersPreparedParquetOpen.read_plan` already explains the purpose; the inline comment duplicated it. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/datasource-parquet/src/opener.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f6c95fa1e3932..fd37c6bf3040f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1039,8 +1039,6 @@ impl MetadataLoadedParquetOpen { None }; - // Projection read plan (ProjectionMask + projected schema) also - // shareable across opens. let read_plan = Arc::new(build_projection_read_plan( prepared.projection.expr_iter(), &physical_file_schema, From 52d6bce6e99b9fba1d98d109fa4b22903340c13d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 22 Apr 2026 15:22:03 +0200 Subject: [PATCH 7/7] fix(datasource): don't exit idle siblings while a donor is still pre-scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before: when every shared file is popped but no donor has reached its split point yet, an idle sibling saw empty queues and returned Done. Any row groups the donor subsequently pushed to the morsel queue were missed by that sibling. Now SharedWorkSource tracks an in-flight donor count via a FileLease RAII guard. Idle siblings that find both queues empty check the count and, if non-zero, wake_by_ref + Poll::Pending to re-poll. The lease drops at the morsel-to-reader transition — once a file is streaming, the donation window is closed, so we don't block siblings on the donor's assigned row groups. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datasource/src/file_stream/scan_state.rs | 35 +++++- .../datasource/src/file_stream/work_source.rs | 106 +++++++++++++++++- 2 files changed, 132 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 21125cd08896c..2f53a3dedfc2c 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -26,7 +26,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard; use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; -use super::work_source::WorkSource; +use super::work_source::{FileLease, PopResult, WorkSource}; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -81,6 +81,12 @@ pub(super) struct ScanState { /// Once the I/O completes, yields the next planner and is pushed back /// onto `ready_planners`. pending_planner: Option, + /// Lease on the current file popped from a shared work source. While + /// held, idle siblings will wait for potential donations from this + /// file instead of declaring the shared source drained. `None` for + /// files that came from a local work source or for pre-finalized + /// morsels. + current_file_lease: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, } @@ -102,6 +108,7 @@ impl ScanState { ready_morsels: Default::default(), reader: None, pending_planner: None, + current_file_lease: None, metrics, } } @@ -146,6 +153,7 @@ impl ScanState { return match self.on_error { OnError::Skip => { self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), @@ -174,6 +182,7 @@ impl ScanState { let batch = batch.slice(0, *remain); let done = 1 + self.work_source.skipped_on_limit(); self.metrics.files_processed.add(done); + self.current_file_lease = None; *remain = 0; (batch, true) } @@ -197,6 +206,7 @@ impl ScanState { return match self.on_error { OnError::Skip => { self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), @@ -205,6 +215,7 @@ impl ScanState { Poll::Ready(None) => { self.reader = None; self.metrics.files_processed.add(1); + self.current_file_lease = None; self.metrics.time_scanning_until_data.stop(); self.metrics.time_scanning_total.stop(); return ScanAndReturn::Continue; @@ -218,6 +229,11 @@ impl ScanState { self.metrics.time_scanning_until_data.start(); self.metrics.time_scanning_total.start(); self.reader = Some(morsel.into_stream()); + // A morsel is now streaming, so we're past the pre-scan window + // where donations happen. Release the in-flight donor slot so + // idle siblings can make progress (or exit) without waiting on + // this stream to finish its assigned row groups. + self.current_file_lease = None; return ScanAndReturn::Continue; } @@ -248,6 +264,7 @@ impl ScanState { } Ok(None) => { self.metrics.files_processed.add(1); + self.current_file_lease = None; self.metrics.time_opening.stop(); ScanAndReturn::Continue } @@ -257,6 +274,7 @@ impl ScanState { match self.on_error { OnError::Skip => { self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), @@ -266,10 +284,18 @@ impl ScanState { } // No outstanding work remains, so begin planning the next unopened file. - let part_file = match self.work_source.pop_front() { - Some(part_file) => part_file, - None => return ScanAndReturn::Done(None), + let (part_file, lease) = match self.work_source.pop_front() { + PopResult::Ready(file, lease) => (file, lease), + PopResult::Pending => { + // A sibling is pre-scan on a shared file that may still + // donate morsels. Re-schedule ourselves so we re-check the + // queues as soon as the scheduler picks us up. + cx.waker().wake_by_ref(); + return ScanAndReturn::Return(Poll::Pending); + } + PopResult::Done => return ScanAndReturn::Done(None), }; + self.current_file_lease = lease; self.metrics.time_opening.start(); match self.morselizer.plan_file(part_file) { @@ -283,6 +309,7 @@ impl ScanState { self.metrics.file_open_errors.add(1); self.metrics.time_opening.stop(); self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 41ebdbb6efa80..85ee9d1f0727d 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -17,6 +17,7 @@ use std::collections::VecDeque; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use crate::PartitionedFile; use crate::file_groups::FileGroup; @@ -37,11 +38,18 @@ pub(super) enum WorkSource { } impl WorkSource { - /// Pop the next file to plan from this work source. - pub(super) fn pop_front(&mut self) -> Option { + /// Try to pop the next item of work. + /// + /// Returns [`PopResult::Pending`] for [`WorkSource::Shared`] when both + /// queues are empty but a sibling is still processing a file that may + /// donate more work. The caller must yield with its waker re-scheduled. + pub(super) fn pop_front(&mut self) -> PopResult { match self { - Self::Local(files) => files.pop_front(), - Self::Shared(shared) => shared.pop_front(), + Self::Local(files) => match files.pop_front() { + Some(file) => PopResult::Ready(file, None), + None => PopResult::Done, + }, + Self::Shared(shared) => shared.pop_front_tracked(), } } @@ -55,6 +63,25 @@ impl WorkSource { } } +/// Outcome of a pop attempt against a [`WorkSource`]. +#[derive(Debug)] +#[expect( + clippy::large_enum_variant, + reason = "Ready carries a PartitionedFile on the common path; boxing would add a heap alloc per pop and the other variants are markers" +)] +pub(super) enum PopResult { + /// Work popped. The optional [`FileLease`] must be held until the file + /// is fully processed; while it is alive, idle siblings treat the + /// shared source as "donor may still publish" instead of drained. + Ready(PartitionedFile, Option), + /// No work currently available, but a sibling is still pre-scan on a + /// file that may donate. The caller must register its waker to be + /// re-polled and yield [`Poll::Pending`]. + Pending, + /// No work available and no donors in flight — fully drained. + Done, +} + /// Shared source of work for sibling `FileStream`s. /// /// Created once per execution and shared by all reorderable sibling streams. @@ -69,6 +96,12 @@ impl WorkSource { /// state machine. Draining morsels first keeps their latency low and /// prevents siblings from starting fresh whole files while half-processed /// sub-file work sits idle. +/// +/// Also tracks an in-flight donor count: every file popped from `files` is +/// backed by a [`FileLease`] whose `Drop` decrements the count. While the +/// count is non-zero, an idle sibling that sees both queues empty must wait +/// rather than declare the source drained — the donor is still pre-scan +/// and may yet push morsels. #[derive(Debug, Clone)] pub struct SharedWorkSource { inner: Arc, @@ -78,6 +111,9 @@ pub struct SharedWorkSource { pub(super) struct SharedWorkSourceInner { morsels: Mutex>, files: Mutex>, + /// Number of files popped from `files` whose [`FileLease`] has not yet + /// been dropped. Non-zero means "donor may still publish morsels." + in_flight: AtomicUsize, } impl Default for SharedWorkSource { @@ -94,6 +130,7 @@ impl SharedWorkSource { inner: Arc::new(SharedWorkSourceInner { morsels: Mutex::new(VecDeque::new()), files: Mutex::new(files), + in_flight: AtomicUsize::new(0), }), } } @@ -106,7 +143,9 @@ impl SharedWorkSource { /// Pop the next item of work — morsels (pre-prepared sub-file chunks) /// first, then whole files. /// - /// Returns `None` if both queues are empty. + /// Returns `None` if both queues are empty. Does *not* track in-flight + /// donors; intended for tests and callers that only observe morsel + /// donations. `ScanState` uses [`Self::pop_front_tracked`]. pub fn pop_front(&self) -> Option { if let Some(morsel) = self.inner.morsels.lock().pop_front() { return Some(morsel); @@ -114,6 +153,42 @@ impl SharedWorkSource { self.inner.files.lock().pop_front() } + /// Pop the next item of work for a sibling `FileStream`, returning a + /// [`PopResult`] that distinguishes "nothing right now but donors may + /// publish" from "truly drained." + pub(super) fn pop_front_tracked(&self) -> PopResult { + if let Some(morsel) = self.inner.morsels.lock().pop_front() { + return PopResult::Ready(morsel, None); + } + // Increment before releasing the `files` lock so a concurrent peer + // cannot observe empty-files && zero-counter while this donor is + // about to register itself. + let mut files = self.inner.files.lock(); + if let Some(file) = files.pop_front() { + self.inner.in_flight.fetch_add(1, Ordering::Release); + drop(files); + return PopResult::Ready( + file, + Some(FileLease { + inner: Arc::clone(&self.inner), + }), + ); + } + drop(files); + // Both queues empty. If any donor is still in flight, wait — it may + // yet donate morsels. + if self.inner.in_flight.load(Ordering::Acquire) > 0 { + return PopResult::Pending; + } + // Counter observed zero. A donor that donated before dropping its + // lease must have pushed morsels before the decrement; re-peek the + // morsel queue to pick them up rather than exit prematurely. + if let Some(morsel) = self.inner.morsels.lock().pop_front() { + return PopResult::Ready(morsel, None); + } + PopResult::Done + } + /// Push pre-prepared morsels onto the morsel queue. /// /// Used when an in-flight file is sub-divided (e.g. parquet row-group @@ -125,3 +200,24 @@ impl SharedWorkSource { queue.extend(items); } } + +/// RAII guard tracking a file popped from a [`SharedWorkSource`]'s `files` +/// queue. While alive, the counter on the source stays non-zero, which +/// keeps idle sibling streams waiting for potential donations instead of +/// declaring the source drained. Dropped when the donor finishes (or +/// gives up on) the file. +pub(super) struct FileLease { + inner: Arc, +} + +impl std::fmt::Debug for FileLease { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileLease").finish_non_exhaustive() + } +} + +impl Drop for FileLease { + fn drop(&mut self) { + self.inner.in_flight.fetch_sub(1, Ordering::Release); + } +}