diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 108e8c5752017..860eb89f1a07c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -43,6 +43,7 @@ use datafusion_common::{ use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; @@ -339,6 +340,28 @@ impl FileOpener for ParquetOpener { file_metrics.files_ranges_pruned_statistics.add_matched(1); + // --------------------------------------------------------------- + // Step: update dynamic filter bounds from file-level statistics + // --------------------------------------------------------------- + + // If the predicate contains a dynamic filter with a file stats + // handler (e.g., from an aggregate computing min/max), update its + // bounds using this file's statistics. This tightens the filter + // earlier, enabling pruning of concurrent and subsequent files. + // + // The handler uses inclusive operators (<=/>= instead of ) + // for file-stats-derived bounds, ensuring the boundary value + // passes through to the accumulator before being confirmed. + if let Some(predicate) = &predicate + && let Some(statistics) = partitioned_file.statistics.as_deref() + { + update_dynamic_filter_from_file_stats( + predicate, + statistics, + &logical_file_schema, + ); + } + // -------------------------------------------------------- // Step: fetch Parquet metadata (and optionally page index) // -------------------------------------------------------- @@ -757,6 +780,35 @@ fn constant_value_from_stats( None } +/// Walk the predicate tree and update any dynamic filters' bounds +/// from the given file-level statistics. +/// +/// This finds [`DynamicFilterPhysicalExpr`] nodes in the predicate and calls +/// their [`update_from_file_statistics`] method if a handler is registered. +fn update_dynamic_filter_from_file_stats( + predicate: &Arc, + statistics: &Statistics, + file_schema: &SchemaRef, +) { + use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; + + let _ = predicate.apply(|expr: &Arc| { + if let Some(dyn_filter) = expr + .as_any() + .downcast_ref::() + { + if let Err(e) = + dyn_filter.update_from_file_statistics(statistics, file_schema) + { + debug!( + "Ignoring error updating dynamic filter from file stats: {e}" + ); + } + } + Ok(TreeNodeRecursion::Continue) + }); +} + /// Wraps an inner RecordBatchStream and a [`FilePruner`] /// /// This can terminate the scan early when some dynamic filters is updated after diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d285f8b377eca..335453421a366 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -22,12 +22,34 @@ use tokio::sync::watch; use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use datafusion_common::{ - Result, + Result, Statistics, tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::DynHash; +/// Trait for dynamic filter producers that can update their bounds +/// based on file-level statistics. +/// +/// When a parquet file is opened, its file-level statistics (min/max per column) +/// can be used to update the dynamic filter bounds before any data is read. +/// This enables earlier pruning of subsequent files. +/// +/// For example, an aggregate computing `min(a)` that sees a file with +/// `min_value(a) = 5` can update its dynamic filter to `a <= 5`, +/// allowing subsequent files where `min_value(a) > 5` to be skipped entirely. +pub trait DynamicFilterFileStatsHandler: Send + Sync + std::fmt::Debug { + /// Update filter bounds based on file-level statistics. + /// + /// Returns `Ok(true)` if any bounds were changed and the filter was updated, + /// `Ok(false)` otherwise. + fn update_from_file_statistics( + &self, + statistics: &Statistics, + schema: &Schema, + ) -> Result; +} + /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum FilterState { @@ -76,7 +98,6 @@ pub struct DynamicFilterPhysicalExpr { nullable: Arc>>, } -#[derive(Debug)] struct Inner { /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply. /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. @@ -86,6 +107,25 @@ struct Inner { /// This is redundant with the watch channel state, but allows us to return immediately /// from `wait_complete()` without subscribing if already complete. is_complete: bool, + /// Optional handler for updating bounds from file-level statistics. + /// This is set by the producer (e.g., `AggregateExec`) that creates the dynamic filter. + file_stats_handler: Option>, +} + +// Manual Debug to avoid infinite recursion: AggrDynFilter → DynamicFilterPhysicalExpr +// → Inner → file_stats_handler (= AggrDynFilter) → … +impl std::fmt::Debug for Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Inner") + .field("generation", &self.generation) + .field("expr", &self.expr) + .field("is_complete", &self.is_complete) + .field( + "file_stats_handler", + &self.file_stats_handler.as_ref().map(|_| "..."), + ) + .finish() + } } impl Inner { @@ -96,6 +136,7 @@ impl Inner { generation: 1, expr, is_complete: false, + file_stats_handler: None, } } @@ -242,10 +283,12 @@ impl DynamicFilterPhysicalExpr { // Load the current inner, increment generation, and store the new one let mut current = self.inner.write(); let new_generation = current.generation + 1; + let file_stats_handler = current.file_stats_handler.clone(); *current = Inner { generation: new_generation, expr: new_expr, is_complete: current.is_complete, + file_stats_handler, }; drop(current); // Release the lock before broadcasting @@ -272,6 +315,40 @@ impl DynamicFilterPhysicalExpr { }); } + /// Set a handler for updating this dynamic filter's bounds from file-level statistics. + /// + /// The handler will be shared across all copies of this dynamic filter + /// (including remapped copies created via `with_new_children`). + pub fn set_file_stats_handler( + &self, + handler: Arc, + ) { + self.inner.write().file_stats_handler = Some(handler); + } + + /// Update this dynamic filter's bounds based on file-level statistics. + /// + /// This delegates to the registered [`DynamicFilterFileStatsHandler`] if one + /// has been set via [`Self::set_file_stats_handler`]. + /// + /// Returns `Ok(true)` if bounds were updated, `Ok(false)` if no handler + /// is registered or no bounds changed. + pub fn update_from_file_statistics( + &self, + statistics: &Statistics, + schema: &Schema, + ) -> Result { + // Clone the handler under a brief read lock, then release + // before calling it (the handler may call `self.update()` which + // acquires a write lock on the same inner). + let handler = self.inner.read().file_stats_handler.clone(); + if let Some(handler) = handler { + handler.update_from_file_statistics(statistics, schema) + } else { + Ok(false) + } + } + /// Wait asynchronously for any update to this filter. /// /// This method will return when [`Self::update`] is called and the generation increases. diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c28..ac1006b5127f5 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast}; pub use cast_column::CastColumnExpr; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{DynamicFilterFileStatsHandler, DynamicFilterPhysicalExpr}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 561e9d1d05e89..1d9ea4849bcfd 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -53,7 +53,10 @@ use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{ + Column, DynamicFilterFileStatsHandler, DynamicFilterPhysicalExpr, lit, +}; +use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use datafusion_physical_expr::{ ConstExpr, EquivalenceProperties, physical_exprs_contains, }; @@ -574,8 +577,20 @@ struct PerAccumulatorDynFilter { /// for no grouping aggregate execution), and this index is into `aggregate_expressions` /// vec inside `AggregateStreamInner` aggr_index: usize, - // The current bound. Shared among all streams. + /// The column expression this accumulator operates on (e.g., `col("a")` for `min(a)`). + column: Arc, + /// The data type the accumulator operates on. This is the type after any + /// projections/casts (e.g., Date32 if a view casts UInt16 to Date32). + /// Used to validate file statistics types in [`AggrDynFilter::update_from_file_statistics`]. + data_type: DataType, + /// The current bound. Shared among all streams. shared_bound: Arc>, + /// Whether the current bound has been confirmed by actual data processing + /// (not just from file-level statistics). When `false`, the filter uses + /// inclusive comparison (`<=`/`>=`) to ensure the boundary value is not + /// filtered out before the accumulator sees it. When `true`, strict + /// comparison (`<`/`>`) is used for more aggressive pruning. + bound_from_data: Arc, } /// Aggregate types that are supported for dynamic filter in `AggregateExec` @@ -585,6 +600,143 @@ enum DynamicFilterAggregateType { Max, } +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::BinaryExpr; + +impl AggrDynFilter { + /// Build the filter predicate expression from current bounds. + /// + /// When `bound_from_data` is true for an accumulator, uses strict comparison + /// (`<`/`>`) for more aggressive pruning. When false (bound from file stats), + /// uses inclusive comparison (`<=`/`>=`) to ensure the boundary value is not + /// filtered out before the accumulator sees it. + fn build_predicate(&self) -> Result> { + let mut predicates: Vec> = + Vec::with_capacity(self.supported_accumulators_info.len()); + + for acc_info in &self.supported_accumulators_info { + let bound = { + let guard = acc_info.shared_bound.lock(); + if (*guard).is_null() { + continue; + } + guard.clone() + }; + + let literal = lit(bound); + let from_data = acc_info.bound_from_data.load(AtomicOrdering::Relaxed); + let predicate: Arc = match acc_info.aggr_type { + DynamicFilterAggregateType::Min => { + let op = if from_data { + Operator::Lt + } else { + Operator::LtEq + }; + Arc::new(BinaryExpr::new( + Arc::clone(&acc_info.column), + op, + literal, + )) + } + DynamicFilterAggregateType::Max => { + let op = if from_data { + Operator::Gt + } else { + Operator::GtEq + }; + Arc::new(BinaryExpr::new( + Arc::clone(&acc_info.column), + op, + literal, + )) + } + }; + predicates.push(predicate); + } + + let combined = predicates.into_iter().reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) as Arc + }); + + Ok(combined.unwrap_or_else(|| lit(true))) + } +} + +impl DynamicFilterFileStatsHandler for AggrDynFilter { + fn update_from_file_statistics( + &self, + statistics: &Statistics, + schema: &Schema, + ) -> Result { + let mut bounds_changed = false; + + for acc_info in &self.supported_accumulators_info { + let col = acc_info + .column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion_common::internal_datafusion_err!( + "Expected Column expression in aggregate dynamic filter" + ) + })?; + + // Look up column in the file schema by name + let col_idx = match schema.index_of(col.name()) { + Ok(idx) if idx < statistics.column_statistics.len() => idx, + _ => continue, + }; + + // Skip if the file column type doesn't match the accumulator's type. + // This can happen when a view applies a CAST (e.g., UInt16 → Date32), + // making file statistics incompatible with accumulator bounds. + let file_col_type = schema.field(col_idx).data_type(); + if file_col_type != &acc_info.data_type { + continue; + } + + let col_stats = &statistics.column_statistics[col_idx]; + + // Extract the relevant statistic based on aggregate type + let stat_value = match acc_info.aggr_type { + DynamicFilterAggregateType::Min => { + col_stats.min_value.get_value().cloned() + } + DynamicFilterAggregateType::Max => { + col_stats.max_value.get_value().cloned() + } + }; + + if let Some(stat_value) = stat_value { + let mut bound = acc_info.shared_bound.lock(); + let new_bound = match acc_info.aggr_type { + DynamicFilterAggregateType::Min => { + no_grouping::scalar_min(&bound, &stat_value)? + } + DynamicFilterAggregateType::Max => { + no_grouping::scalar_max(&bound, &stat_value)? + } + }; + if new_bound != *bound { + *bound = new_bound; + // Mark as NOT from data since this came from file stats + acc_info + .bound_from_data + .store(false, AtomicOrdering::Relaxed); + bounds_changed = true; + } + } + } + + if bounds_changed { + let predicate = self.build_predicate()?; + self.filter.update(predicate)?; + } + + Ok(bounds_changed) + } +} + /// Configuration for limit-based optimizations in aggregation #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct LimitOptions { @@ -1153,22 +1305,35 @@ impl AggregateExec { // 2. arg should be only 1 column reference if let [arg] = aggr_expr.expressions().as_slice() - && arg.as_any().is::() + && let Some(col) = arg.as_any().downcast_ref::() { + let col_data_type = self.input_schema.field(col.index()).data_type().clone(); all_cols.push(Arc::clone(arg)); aggr_dyn_filters.push(PerAccumulatorDynFilter { aggr_type, aggr_index: i, + column: Arc::clone(arg), + data_type: col_data_type, shared_bound: Arc::new(Mutex::new(ScalarValue::Null)), + bound_from_data: Arc::new(AtomicBool::new(false)), }); } } if !aggr_dyn_filters.is_empty() { - self.dynamic_filter = Some(Arc::new(AggrDynFilter { - filter: Arc::new(DynamicFilterPhysicalExpr::new(all_cols, lit(true))), + let aggr_dyn_filter = Arc::new(AggrDynFilter { + filter: Arc::new(DynamicFilterPhysicalExpr::new( + all_cols, + lit(true), + )), supported_accumulators_info: aggr_dyn_filters, - })) + }); + // Register the file stats handler so the parquet opener can + // update bounds based on file-level min/max statistics. + aggr_dyn_filter + .filter + .set_file_stats_handler(Arc::clone(&aggr_dyn_filter) as _); + self.dynamic_filter = Some(aggr_dyn_filter); } } diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a7dd7c9a66cb1..5451b2410ff46 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -26,11 +26,9 @@ use crate::metrics::{BaselineMetrics, RecordOutput}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::{Result, ScalarValue, internal_datafusion_err, internal_err}; +use datafusion_common::{Result, ScalarValue, internal_datafusion_err}; use datafusion_execution::TaskContext; -use datafusion_expr::Operator; use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr::expressions::{BinaryExpr, lit}; use futures::stream::BoxStream; use std::borrow::Cow; use std::cmp::Ordering; @@ -76,92 +74,17 @@ struct AggregateStreamInner { } impl AggregateStreamInner { - // TODO: check if we get Null handling correct - /// # Examples - /// - Example 1 - /// Accumulators: min(c1) - /// Current Bounds: min(c1)=10 - /// --> dynamic filter PhysicalExpr: c1 < 10 - /// - /// - Example 2 - /// Accumulators: min(c1), max(c1), min(c2) - /// Current Bounds: min(c1)=10, max(c1)=100, min(c2)=20 - /// --> dynamic filter PhysicalExpr: (c1 < 10) OR (c1>100) OR (c2 < 20) - /// - /// # Errors - /// Returns internal errors if the dynamic filter is not enabled, or other - /// invariant check fails. - fn build_dynamic_filter_from_accumulator_bounds( - &self, - ) -> Result> { - let Some(filter_state) = self.agg_dyn_filter_state.as_ref() else { - return internal_err!( - "`build_dynamic_filter_from_accumulator_bounds()` is only called when dynamic filter is enabled" - ); - }; - - let mut predicates: Vec> = - Vec::with_capacity(filter_state.supported_accumulators_info.len()); - - for acc_info in &filter_state.supported_accumulators_info { - // Skip if we don't yet have a meaningful bound - let bound = { - let guard = acc_info.shared_bound.lock(); - if (*guard).is_null() { - continue; - } - guard.clone() - }; - - let agg_exprs = self - .aggregate_expressions - .get(acc_info.aggr_index) - .ok_or_else(|| { - internal_datafusion_err!( - "Invalid aggregate expression index {} for dynamic filter", - acc_info.aggr_index - ) - })?; - // Only aggregates with a single argument are supported. - let column_expr = agg_exprs.first().ok_or_else(|| { - internal_datafusion_err!( - "Aggregate expression at index {} expected a single argument", - acc_info.aggr_index - ) - })?; - - let literal = lit(bound); - let predicate: Arc = match acc_info.aggr_type { - DynamicFilterAggregateType::Min => Arc::new(BinaryExpr::new( - Arc::clone(column_expr), - Operator::Lt, - literal, - )), - DynamicFilterAggregateType::Max => Arc::new(BinaryExpr::new( - Arc::clone(column_expr), - Operator::Gt, - literal, - )), - }; - predicates.push(predicate); - } - - let combined = predicates.into_iter().reduce(|acc, pred| { - Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) as Arc - }); - - Ok(combined.unwrap_or_else(|| lit(true))) - } - // If the dynamic filter is enabled, update it using the current accumulator's // values fn maybe_update_dyn_filter(&mut self) -> Result<()> { + use std::sync::atomic::Ordering as AtomicOrdering; + // Step 1: Update each partition's current bound let Some(filter_state) = self.agg_dyn_filter_state.as_ref() else { return Ok(()); }; - let mut bounds_changed = false; + let mut needs_update = false; for acc_info in &filter_state.supported_accumulators_info { let acc = @@ -188,15 +111,45 @@ impl AggregateStreamInner { }; if new_bound != *bound { *bound = new_bound; - bounds_changed = true; + // The bound came from actual data, so we can use + // strict comparison for better pruning. + acc_info + .bound_from_data + .store(true, AtomicOrdering::Relaxed); + needs_update = true; + } + } + // Even if the bound value didn't change, the accumulator may have + // confirmed a file-stats-derived bound (transitioning from <=/>= to ). + if !acc_info.bound_from_data.load(AtomicOrdering::Relaxed) + && !current_bound.is_null() + { + let shared = acc_info.shared_bound.lock().clone(); + let confirmed = match acc_info.aggr_type { + DynamicFilterAggregateType::Min => { + // If accumulator min <= shared bound, accumulator has + // confirmed the bound value exists in actual data. + current_bound.partial_cmp(&shared) + != Some(Ordering::Greater) + } + DynamicFilterAggregateType::Max => { + current_bound.partial_cmp(&shared) + != Some(Ordering::Less) + } + }; + if confirmed { + acc_info + .bound_from_data + .store(true, AtomicOrdering::Relaxed); + needs_update = true; } } } // Step 2: Sync the dynamic filter physical expression with reader, - // but only if any bound actually changed. - if bounds_changed { - let predicate = self.build_dynamic_filter_from_accumulator_bounds()?; + // but only if any bound actually changed or operator switched. + if needs_update { + let predicate = filter_state.build_predicate()?; filter_state.filter.update(predicate)?; } @@ -213,7 +166,7 @@ impl AggregateStreamInner { /// /// # Errors /// Returns internal error if v1 and v2 has incompatible types. -fn scalar_min(v1: &ScalarValue, v2: &ScalarValue) -> Result { +pub(super) fn scalar_min(v1: &ScalarValue, v2: &ScalarValue) -> Result { if let Some(result) = scalar_cmp_null_short_circuit(v1, v2) { return Ok(result); } @@ -236,7 +189,7 @@ fn scalar_min(v1: &ScalarValue, v2: &ScalarValue) -> Result { /// /// # Errors /// Returns internal error if v1 and v2 has incompatible types. -fn scalar_max(v1: &ScalarValue, v2: &ScalarValue) -> Result { +pub(super) fn scalar_max(v1: &ScalarValue, v2: &ScalarValue) -> Result { if let Some(result) = scalar_cmp_null_short_circuit(v1, v2) { return Ok(result); }