Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_common::{
use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use datafusion_physical_expr_common::physical_expr::{
PhysicalExpr, is_dynamic_physical_expr,
};
Expand Down Expand Up @@ -339,6 +340,28 @@ impl FileOpener for ParquetOpener {

file_metrics.files_ranges_pruned_statistics.add_matched(1);

// ---------------------------------------------------------------
// Step: update dynamic filter bounds from file-level statistics
// ---------------------------------------------------------------

// If the predicate contains a dynamic filter with a file stats
// handler (e.g., from an aggregate computing min/max), update its
// bounds using this file's statistics. This tightens the filter
// earlier, enabling pruning of concurrent and subsequent files.
//
// The handler uses inclusive operators (<=/>= instead of </>)
// for file-stats-derived bounds, ensuring the boundary value
// passes through to the accumulator before being confirmed.
if let Some(predicate) = &predicate
&& let Some(statistics) = partitioned_file.statistics.as_deref()
{
update_dynamic_filter_from_file_stats(
predicate,
statistics,
&logical_file_schema,
);
}

// --------------------------------------------------------
// Step: fetch Parquet metadata (and optionally page index)
// --------------------------------------------------------
Expand Down Expand Up @@ -757,6 +780,35 @@ fn constant_value_from_stats(
None
}

/// Walk the predicate tree and update any dynamic filters' bounds
/// from the given file-level statistics.
///
/// This finds [`DynamicFilterPhysicalExpr`] nodes in the predicate and calls
/// their [`update_from_file_statistics`] method if a handler is registered.
fn update_dynamic_filter_from_file_stats(
predicate: &Arc<dyn PhysicalExpr>,
statistics: &Statistics,
file_schema: &SchemaRef,
) {
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};

let _ = predicate.apply(|expr: &Arc<dyn PhysicalExpr>| {
if let Some(dyn_filter) = expr
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
{
if let Err(e) =
dyn_filter.update_from_file_statistics(statistics, file_schema)
{
debug!(
"Ignoring error updating dynamic filter from file stats: {e}"
);
}
}
Ok(TreeNodeRecursion::Continue)
});
}

/// Wraps an inner RecordBatchStream and a [`FilePruner`]
///
/// This can terminate the scan early when some dynamic filters is updated after
Expand Down
81 changes: 79 additions & 2 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,34 @@ use tokio::sync::watch;
use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
use datafusion_common::{
Result,
Result, Statistics,
tree_node::{Transformed, TransformedResult, TreeNode},
};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::DynHash;

/// Trait for dynamic filter producers that can update their bounds
/// based on file-level statistics.
///
/// When a parquet file is opened, its file-level statistics (min/max per column)
/// can be used to update the dynamic filter bounds before any data is read.
/// This enables earlier pruning of subsequent files.
///
/// For example, an aggregate computing `min(a)` that sees a file with
/// `min_value(a) = 5` can update its dynamic filter to `a <= 5`,
/// allowing subsequent files where `min_value(a) > 5` to be skipped entirely.
pub trait DynamicFilterFileStatsHandler: Send + Sync + std::fmt::Debug {
/// Update filter bounds based on file-level statistics.
///
/// Returns `Ok(true)` if any bounds were changed and the filter was updated,
/// `Ok(false)` otherwise.
fn update_from_file_statistics(
&self,
statistics: &Statistics,
schema: &Schema,
) -> Result<bool>;
}

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FilterState {
Expand Down Expand Up @@ -76,7 +98,6 @@ pub struct DynamicFilterPhysicalExpr {
nullable: Arc<RwLock<Option<bool>>>,
}

#[derive(Debug)]
struct Inner {
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
Expand All @@ -86,6 +107,25 @@ struct Inner {
/// This is redundant with the watch channel state, but allows us to return immediately
/// from `wait_complete()` without subscribing if already complete.
is_complete: bool,
/// Optional handler for updating bounds from file-level statistics.
/// This is set by the producer (e.g., `AggregateExec`) that creates the dynamic filter.
file_stats_handler: Option<Arc<dyn DynamicFilterFileStatsHandler>>,
}

// Manual Debug to avoid infinite recursion: AggrDynFilter → DynamicFilterPhysicalExpr
// → Inner → file_stats_handler (= AggrDynFilter) → …
impl std::fmt::Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Inner")
.field("generation", &self.generation)
.field("expr", &self.expr)
.field("is_complete", &self.is_complete)
.field(
"file_stats_handler",
&self.file_stats_handler.as_ref().map(|_| "..."),
)
.finish()
}
}

impl Inner {
Expand All @@ -96,6 +136,7 @@ impl Inner {
generation: 1,
expr,
is_complete: false,
file_stats_handler: None,
}
}

Expand Down Expand Up @@ -242,10 +283,12 @@ impl DynamicFilterPhysicalExpr {
// Load the current inner, increment generation, and store the new one
let mut current = self.inner.write();
let new_generation = current.generation + 1;
let file_stats_handler = current.file_stats_handler.clone();
*current = Inner {
generation: new_generation,
expr: new_expr,
is_complete: current.is_complete,
file_stats_handler,
};
drop(current); // Release the lock before broadcasting

Expand All @@ -272,6 +315,40 @@ impl DynamicFilterPhysicalExpr {
});
}

/// Set a handler for updating this dynamic filter's bounds from file-level statistics.
///
/// The handler will be shared across all copies of this dynamic filter
/// (including remapped copies created via `with_new_children`).
pub fn set_file_stats_handler(
&self,
handler: Arc<dyn DynamicFilterFileStatsHandler>,
) {
self.inner.write().file_stats_handler = Some(handler);
}

/// Update this dynamic filter's bounds based on file-level statistics.
///
/// This delegates to the registered [`DynamicFilterFileStatsHandler`] if one
/// has been set via [`Self::set_file_stats_handler`].
///
/// Returns `Ok(true)` if bounds were updated, `Ok(false)` if no handler
/// is registered or no bounds changed.
pub fn update_from_file_statistics(
&self,
statistics: &Statistics,
schema: &Schema,
) -> Result<bool> {
// Clone the handler under a brief read lock, then release
// before calling it (the handler may call `self.update()` which
// acquires a write lock on the same inner).
let handler = self.inner.read().file_stats_handler.clone();
if let Some(handler) = handler {
handler.update_from_file_statistics(statistics, schema)
} else {
Ok(false)
}
}

/// Wait asynchronously for any update to this filter.
///
/// This method will return when [`Self::update`] is called and the generation increases.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast};
pub use cast_column::CastColumnExpr;
pub use column::{Column, col, with_new_schema};
pub use datafusion_expr::utils::format_state_name;
pub use dynamic_filters::DynamicFilterPhysicalExpr;
pub use dynamic_filters::{DynamicFilterFileStatsHandler, DynamicFilterPhysicalExpr};
pub use in_list::{InListExpr, in_list};
pub use is_not_null::{IsNotNullExpr, is_not_null};
pub use is_null::{IsNullExpr, is_null};
Expand Down
Loading
Loading