From df663f971493f9e909819be7609c922841de2ae0 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 22 Dec 2025 17:15:44 +0800 Subject: [PATCH 01/11] cp --- .../physical-expr-common/src/physical_expr.rs | 61 +++++++++++++++++++ datafusion/pruning/src/lib.rs | 1 + datafusion/pruning/src/pruner.rs | 19 ++++++ 3 files changed, 81 insertions(+) create mode 100644 datafusion/pruning/src/pruner.rs diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 2358a21940912..f55f52dc970bb 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -430,6 +430,67 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn is_volatile_node(&self) -> bool { false } + + // --------------- + // Pruning related + // --------------- + // None means propagation not implemented/supported for this node + fn propagate_range_stats( + child_range_stats: &[RangeStats], + ) -> Result> { + Ok(None) + } + + fn propagate_null_stats( + child_range_stats: &[NullStats], + ) -> Result> { + Ok(None) + } + + fn evaluate_pruning(&self) -> Result { + // This will be the default impl for stats propagation nodes (like arithmetic + // expressions) + // 1. Evaluate pruning for all child + // 2. Extract all range_stats from the child, call self's `propagagte_range_stats` + // , and build the output RangeStats + // 3. ditto for NullStats + // 4. Finally build the PruningIntermediate for current node + unimplemented!() + } +} + +// Pruner Common +/// e.g. for x > 5 +/// bucket 1 has stat [10,15] -> AlwaysTrue +/// bucket 2 has stat [0,5] -> AlwaysFalse +/// bucket 3 has stat [0,10] -> Unknown +pub enum PruningResult { + AlwaysTrue, + AlwaysFalse, + Unknown, +} + +pub struct RangeStats { + mins: Option, + maxs: Option, + length: usize, +} + +pub struct NullStats { + null_counts: Option, + row_counts: Option, + length: usize, +} + +pub struct ColumnStats { + range_stats: Option, + null_stats: Option, +} + +pub enum PruningIntermediate { + IntermediateStats(ColumnStats), + IntermediateResult(PruningResult), + Unsupported, } #[deprecated( diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index 9f8142447ba69..318645d4e8283 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -19,6 +19,7 @@ #![deny(clippy::allow_attributes)] mod file_pruner; +pub mod pruner; mod pruning_predicate; pub use file_pruner::FilePruner; diff --git a/datafusion/pruning/src/pruner.rs b/datafusion/pruning/src/pruner.rs new file mode 100644 index 0000000000000..1bdcec0e0cda1 --- /dev/null +++ b/datafusion/pruning/src/pruner.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::ArrayRef; + From 5b34bc5d52919efec03d475bfe82e8808bd6a2e7 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 22 Dec 2025 17:34:31 +0800 Subject: [PATCH 02/11] cp --- .../physical-expr-common/src/physical_expr.rs | 201 +++++++++++++++++- datafusion/pruning/src/pruner.rs | 1 - 2 files changed, 191 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index f55f52dc970bb..4bcb7353d31f8 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -436,26 +436,109 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { // --------------- // None means propagation not implemented/supported for this node fn propagate_range_stats( - child_range_stats: &[RangeStats], + &self, + _child_range_stats: &[RangeStats], ) -> Result> { Ok(None) } fn propagate_null_stats( - child_range_stats: &[NullStats], + &self, + _child_range_stats: &[NullStats], ) -> Result> { Ok(None) } fn evaluate_pruning(&self) -> Result { - // This will be the default impl for stats propagation nodes (like arithmetic - // expressions) - // 1. Evaluate pruning for all child - // 2. Extract all range_stats from the child, call self's `propagagte_range_stats` - // , and build the output RangeStats - // 3. ditto for NullStats - // 4. Finally build the PruningIntermediate for current node - unimplemented!() + // Default impl for stats-propagation nodes (e.g. arithmetic expressions): + // 1) Evaluate pruning for all children. + // 2) If every child produced range/null stats, propagate them. + // 3) If no stats can be propagated, fall back to `Unsupported`. + let children = self.children(); + if children.is_empty() { + return Ok(PruningIntermediate::Unsupported); + } + + let mut range_complete = true; + let mut null_complete = true; + let mut child_range_stats = Vec::with_capacity(children.len()); + let mut child_null_stats = Vec::with_capacity(children.len()); + + for child in children { + match child.evaluate_pruning()? { + PruningIntermediate::IntermediateStats(stats) => { + match stats.range_stats { + Some(range_stats) if range_complete => { + child_range_stats.push(range_stats); + } + _ => { + range_complete = false; + } + } + + match stats.null_stats { + Some(null_stats) if null_complete => { + child_null_stats.push(null_stats); + } + _ => { + null_complete = false; + } + } + } + // Without node-specific semantics, we can't combine a final pruning result here. + other => return Ok(other), + } + } + + let range_stats = if range_complete && !child_range_stats.is_empty() { + if let Some((first, rest)) = child_range_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.length, + stats.length, + "Range stats length mismatch between pruning children" + ); + } + } + self.propagate_range_stats(&child_range_stats)? + } else { + None + }; + + let null_stats = if null_complete && !child_null_stats.is_empty() { + if let Some((first, rest)) = child_null_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.length, + stats.length, + "Null stats length mismatch between pruning children" + ); + } + } + self.propagate_null_stats(&child_null_stats)? + } else { + None + }; + + if let (Some(range_stats), Some(null_stats)) = + (range_stats.as_ref(), null_stats.as_ref()) + { + assert_eq_or_internal_err!( + range_stats.length, + null_stats.length, + "Range and null stats length mismatch for pruning" + ); + } + + match (range_stats, null_stats) { + (None, None) => Ok(PruningIntermediate::Unsupported), + (range_stats, null_stats) => { + Ok(PruningIntermediate::IntermediateStats(ColumnStats { + range_stats, + null_stats, + })) + } + } } } @@ -464,29 +547,127 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { /// bucket 1 has stat [10,15] -> AlwaysTrue /// bucket 2 has stat [0,5] -> AlwaysFalse /// bucket 3 has stat [0,10] -> Unknown +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PruningResult { AlwaysTrue, AlwaysFalse, Unknown, } +#[derive(Debug, Clone)] pub struct RangeStats { mins: Option, maxs: Option, length: usize, } +#[derive(Debug, Clone)] pub struct NullStats { null_counts: Option, row_counts: Option, length: usize, } +#[derive(Debug, Clone)] pub struct ColumnStats { range_stats: Option, null_stats: Option, } +impl RangeStats { + pub fn new( + mins: Option, + maxs: Option, + length: usize, + ) -> Result { + if let Some(ref mins) = mins { + assert_eq_or_internal_err!( + mins.len(), + length, + "Range mins length mismatch for pruning statistics" + ); + } + if let Some(ref maxs) = maxs { + assert_eq_or_internal_err!( + maxs.len(), + length, + "Range maxs length mismatch for pruning statistics" + ); + } + Ok(Self { mins, maxs, length }) + } + + pub fn len(&self) -> usize { + self.length + } + + pub fn mins(&self) -> Option<&ArrayRef> { + self.mins.as_ref() + } + + pub fn maxs(&self) -> Option<&ArrayRef> { + self.maxs.as_ref() + } +} + +impl NullStats { + pub fn new( + null_counts: Option, + row_counts: Option, + length: usize, + ) -> Result { + if let Some(ref null_counts) = null_counts { + assert_eq_or_internal_err!( + null_counts.len(), + length, + "Null counts length mismatch for pruning statistics" + ); + } + if let Some(ref row_counts) = row_counts { + assert_eq_or_internal_err!( + row_counts.len(), + length, + "Row counts length mismatch for pruning statistics" + ); + } + Ok(Self { + null_counts, + row_counts, + length, + }) + } + + pub fn len(&self) -> usize { + self.length + } + + pub fn null_counts(&self) -> Option<&ArrayRef> { + self.null_counts.as_ref() + } + + pub fn row_counts(&self) -> Option<&ArrayRef> { + self.row_counts.as_ref() + } +} + +impl ColumnStats { + pub fn new(range_stats: Option, null_stats: Option) -> Self { + Self { + range_stats, + null_stats, + } + } + + pub fn range_stats(&self) -> Option<&RangeStats> { + self.range_stats.as_ref() + } + + pub fn null_stats(&self) -> Option<&NullStats> { + self.null_stats.as_ref() + } +} + +#[derive(Debug, Clone)] pub enum PruningIntermediate { IntermediateStats(ColumnStats), IntermediateResult(PruningResult), diff --git a/datafusion/pruning/src/pruner.rs b/datafusion/pruning/src/pruner.rs index 1bdcec0e0cda1..cc11c165a0f61 100644 --- a/datafusion/pruning/src/pruner.rs +++ b/datafusion/pruning/src/pruner.rs @@ -16,4 +16,3 @@ // under the License. use arrow::array::ArrayRef; - From 154c5269c9da99375c7fcb9e000ddbfc9335e6b1 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 23 Dec 2025 12:41:13 +0800 Subject: [PATCH 03/11] cp --- .../physical-expr-common/src/physical_expr.rs | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 4bcb7353d31f8..4df11a4cfbd69 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -27,6 +27,7 @@ use arrow::array::{ArrayRef, BooleanArray, new_empty_array}; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use arrow::record_batch::RecordBatch; +use datafusion_common::pruning::PruningStatistics; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; @@ -449,14 +450,14 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { Ok(None) } - fn evaluate_pruning(&self) -> Result { + fn evaluate_pruning(&self, ctx: Arc) -> Result { // Default impl for stats-propagation nodes (e.g. arithmetic expressions): // 1) Evaluate pruning for all children. // 2) If every child produced range/null stats, propagate them. // 3) If no stats can be propagated, fall back to `Unsupported`. let children = self.children(); if children.is_empty() { - return Ok(PruningIntermediate::Unsupported); + return Ok(PruningIntermediate::empty_stats()); } let mut range_complete = true; @@ -465,7 +466,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { let mut child_null_stats = Vec::with_capacity(children.len()); for child in children { - match child.evaluate_pruning()? { + match child.evaluate_pruning(Arc::clone(&ctx))? { PruningIntermediate::IntermediateStats(stats) => { match stats.range_stats { Some(range_stats) if range_complete => { @@ -530,15 +531,10 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { ); } - match (range_stats, null_stats) { - (None, None) => Ok(PruningIntermediate::Unsupported), - (range_stats, null_stats) => { - Ok(PruningIntermediate::IntermediateStats(ColumnStats { - range_stats, - null_stats, - })) - } - } + Ok(PruningIntermediate::IntermediateStats(ColumnStats { + range_stats, + null_stats, + })) } } @@ -610,6 +606,10 @@ impl RangeStats { } } +struct PruningContext { + stats: Arc, +} + impl NullStats { pub fn new( null_counts: Option, @@ -671,7 +671,13 @@ impl ColumnStats { pub enum PruningIntermediate { IntermediateStats(ColumnStats), IntermediateResult(PruningResult), - Unsupported, +} + +impl PruningIntermediate { + /// Create an `IntermediateStats` variant with no range or null statistics. + pub fn empty_stats() -> Self { + Self::IntermediateStats(ColumnStats::new(None, None)) + } } #[deprecated( From 6c36f37472f43cf75d4c2a727329c6db485db1e2 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 23 Dec 2025 16:03:47 +0800 Subject: [PATCH 04/11] add scalar range stat type --- Cargo.lock | 1 + .../physical-expr-common/src/physical_expr.rs | 86 ++++++++--- datafusion/physical-expr/Cargo.toml | 1 + .../physical-expr/src/expressions/column.rs | 34 ++++- datafusion/physical-expr/tests/pruning.rs | 114 +++++++++++++++ .../physical-expr/tests/pruning_utils.rs | 134 ++++++++++++++++++ 6 files changed, 349 insertions(+), 21 deletions(-) create mode 100644 datafusion/physical-expr/tests/pruning.rs create mode 100644 datafusion/physical-expr/tests/pruning_utils.rs diff --git a/Cargo.lock b/Cargo.lock index 663c64bf8a789..e9f6c76a88092 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2499,6 +2499,7 @@ dependencies = [ "insta", "itertools 0.14.0", "parking_lot", + "parquet", "paste", "petgraph 0.8.3", "rand 0.9.2", diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 4df11a4cfbd69..77d51c553477e 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -495,8 +495,8 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { if let Some((first, rest)) = child_range_stats.split_first() { for stats in rest { assert_eq_or_internal_err!( - first.length, - stats.length, + first.len(), + stats.len(), "Range stats length mismatch between pruning children" ); } @@ -525,7 +525,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { (range_stats.as_ref(), null_stats.as_ref()) { assert_eq_or_internal_err!( - range_stats.length, + range_stats.len(), null_stats.length, "Range and null stats length mismatch for pruning" ); @@ -551,10 +551,18 @@ pub enum PruningResult { } #[derive(Debug, Clone)] -pub struct RangeStats { - mins: Option, - maxs: Option, - length: usize, +pub enum RangeStats { + Values { + mins: Option, + maxs: Option, + length: usize, + }, + /// Represents a uniform literal value across all containers. + /// This variant make it easy to compare between literals and normal ranges representing + /// each containers' value range. + /// + /// TODO: remove length -- seems redundant + Scalar { value: ScalarValue, length: usize }, } #[derive(Debug, Clone)] @@ -566,8 +574,8 @@ pub struct NullStats { #[derive(Debug, Clone)] pub struct ColumnStats { - range_stats: Option, - null_stats: Option, + pub range_stats: Option, + pub null_stats: Option, } impl RangeStats { @@ -590,26 +598,39 @@ impl RangeStats { "Range maxs length mismatch for pruning statistics" ); } - Ok(Self { mins, maxs, length }) + Ok(Self::Values { mins, maxs, length }) } - pub fn len(&self) -> usize { - self.length - } - - pub fn mins(&self) -> Option<&ArrayRef> { - self.mins.as_ref() + /// Create range stats for a constant literal across all containers. + /// + pub fn new_scalar(value: ScalarValue, length: usize) -> Result { + ScalarValue::iter_to_array(std::iter::repeat(value.clone()).take(length))?; + Ok(Self::Scalar { value, length }) } - pub fn maxs(&self) -> Option<&ArrayRef> { - self.maxs.as_ref() + pub fn len(&self) -> usize { + match self { + RangeStats::Values { length, .. } | RangeStats::Scalar { length, .. } => { + *length + } + } } } -struct PruningContext { +pub struct PruningContext { stats: Arc, } +impl PruningContext { + pub fn new(stats: Arc) -> Self { + Self { stats } + } + + pub fn statistics(&self) -> &Arc { + &self.stats + } +} + impl NullStats { pub fn new( null_counts: Option, @@ -910,9 +931,10 @@ pub fn is_volatile(expr: &Arc) -> bool { #[cfg(test)] mod test { - use crate::physical_expr::PhysicalExpr; + use crate::physical_expr::{PhysicalExpr, RangeStats}; use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch}; use arrow::datatypes::{DataType, Schema}; + use datafusion_common::ScalarValue; use datafusion_expr_common::columnar_value::ColumnarValue; use std::fmt::{Display, Formatter}; use std::sync::Arc; @@ -1093,4 +1115,28 @@ mod test { &BooleanArray::from(vec![true; 5]), ); } + + #[test] + fn range_stats_scalar_variant() { + let stats = RangeStats::new_scalar(ScalarValue::Int64(Some(42)), 3).unwrap(); + assert_eq!(stats.len(), 3); + + let (mins, maxs) = match &stats { + RangeStats::Scalar { value, length } => { + let arr = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(*length), + ) + .unwrap(); + (arr.clone(), arr) + } + RangeStats::Values { mins, maxs, .. } => { + (mins.clone().expect("mins"), maxs.clone().expect("maxs")) + } + }; + + let mins = mins.as_any().downcast_ref::().unwrap(); + let maxs = maxs.as_any().downcast_ref::().unwrap(); + assert_eq!(mins, &Int64Array::from(vec![Some(42), Some(42), Some(42)])); + assert_eq!(maxs, &Int64Array::from(vec![Some(42), Some(42), Some(42)])); + } } diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 1b23beeaa37cc..66720a43c506b 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -68,6 +68,7 @@ datafusion-functions = { workspace = true } insta = { workspace = true } rand = { workspace = true } rstest = { workspace = true } +parquet = { workspace = true } [[bench]] harness = false diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 8c7e8c319fff4..6ec3b48a27996 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -28,8 +28,11 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{Result, internal_err, plan_err}; +use datafusion_common::{Column as DFColumn, Result, internal_err, plan_err}; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, NullStats, PruningContext, PruningIntermediate, RangeStats, +}; /// Represents the column at a given index in a RecordBatch /// @@ -143,6 +146,35 @@ impl PhysicalExpr for Column { Ok(self) } + /// Read the column statistics from `PruningContext`, transform the input stat + /// to the stats in the internal representation (`PruningIntermediate`) + fn evaluate_pruning(&self, ctx: Arc) -> Result { + let pruning_column = DFColumn::from_name(self.name()); + let stats = ctx.statistics(); + let num_containers = stats.num_containers(); + + let min_values = stats.min_values(&pruning_column); + let max_values = stats.max_values(&pruning_column); + let range_stats = if min_values.is_some() || max_values.is_some() { + Some(RangeStats::new(min_values, max_values, num_containers)?) + } else { + None + }; + + let null_counts = stats.null_counts(&pruning_column); + let row_counts = stats.row_counts(&pruning_column); + let null_stats = if null_counts.is_some() || row_counts.is_some() { + Some(NullStats::new(null_counts, row_counts, num_containers)?) + } else { + None + }; + + Ok(PruningIntermediate::IntermediateStats(ColumnStats::new( + range_stats, + null_stats, + ))) + } + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name) } diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs new file mode 100644 index 0000000000000..8677806e2e541 --- /dev/null +++ b/datafusion/physical-expr/tests/pruning.rs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod pruning_utils; + +use std::collections::HashSet; +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int32Array, UInt64Array}; +use datafusion_common::ScalarValue; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::expressions::{Column, lit}; +use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, PruningContext, PruningIntermediate, RangeStats, +}; + +use crate::pruning_utils::{DummyStats, MockPruningStatistics}; + +#[test] +fn column_pruning_uses_parquet_stats() { + // Dummy stats: two containers with constant value 10 and 3 rows each. + let pruning_stats = Arc::new(MockPruningStatistics::from_scalar( + "a", + ScalarValue::Int32(Some(10)), + 2, + 3, + )); + + let context = Arc::new(PruningContext::new(pruning_stats)); + let column_expr = Column::new("a", 0); + + match column_expr.evaluate_pruning(context).unwrap() { + PruningIntermediate::IntermediateStats(stats) => { + let range_stats = stats.range_stats().expect("range stats"); + assert_eq!(range_stats.len(), 2); + + let (mins, maxs) = match range_stats { + RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + .. + } => (mins.clone(), maxs.clone()), + RangeStats::Scalar { value, length } => { + let arr = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(*length), + ) + .unwrap(); + (arr.clone(), arr) + } + _ => panic!("missing min/max stats"), + }; + + let mins = mins.as_any().downcast_ref::().unwrap(); + let maxs = maxs.as_any().downcast_ref::().unwrap(); + assert_eq!(mins, &Int32Array::from(vec![Some(10), Some(10)])); + assert_eq!(maxs, &Int32Array::from(vec![Some(10), Some(10)])); + + let null_stats = stats.null_stats().expect("null stats"); + assert_eq!(null_stats.len(), 2); + + let null_counts = null_stats + .null_counts() + .expect("null counts") + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(null_counts, &UInt64Array::from(vec![Some(0), Some(0)])); + + let row_counts = null_stats + .row_counts() + .expect("row counts") + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(row_counts, &UInt64Array::from(vec![Some(3), Some(3)])); + } + other => panic!("expected stats, got {other:?}"), + } +} + +#[test] +fn lit_basic() { + use datafusion_common::pruning::PruningStatistics; + + let lit_expr = lit(5); + let ctx = Arc::new(PruningContext::new(Arc::new(DummyStats))); + let stat = lit_expr.evaluate_pruning(ctx); + + if let Ok(pruning_intermediate) = stat + && let PruningIntermediate::IntermediateStats(stat) = pruning_intermediate + && let ColumnStats { + range_stats, + null_stats, + } = stat + && let Some(range_stat) = range_stats + && let RangeStats::Scalar { value, length } = range_stat + { + // assert value is 5 + } +} diff --git a/datafusion/physical-expr/tests/pruning_utils.rs b/datafusion/physical-expr/tests/pruning_utils.rs new file mode 100644 index 0000000000000..67144ca3eb9ed --- /dev/null +++ b/datafusion/physical-expr/tests/pruning_utils.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use arrow::array::{ArrayRef, BooleanArray}; +use datafusion_common::pruning::PruningStatistics; +use datafusion_common::{Column, ScalarValue}; + +pub struct DummyStats; + +impl PruningStatistics for DummyStats { + fn min_values(&self, _: &Column) -> Option { + None + } + fn max_values(&self, _: &Column) -> Option { + None + } + fn num_containers(&self) -> usize { + 0 + } + fn null_counts(&self, _: &Column) -> Option { + None + } + fn row_counts(&self, _: &Column) -> Option { + None + } + fn contained(&self, _: &Column, _: &HashSet) -> Option { + None + } +} + +/// Simple `PruningStatistics` implementation backed by literal arrays. +/// Useful for testing expression pruning semantics without plumbing +/// actual file/row group metadata. +pub struct MockPruningStatistics { + column: String, + min_values: ArrayRef, + max_values: ArrayRef, + null_counts: ArrayRef, + row_counts: Option, + num_containers: usize, +} + +impl MockPruningStatistics { + pub fn new( + column: impl Into, + min_values: ArrayRef, + max_values: ArrayRef, + null_counts: ArrayRef, + row_counts: Option, + ) -> Self { + let num_containers = min_values.len(); + Self { + column: column.into(), + min_values, + max_values, + null_counts, + row_counts, + num_containers, + } + } + + /// Convenience constructor for uniform literal stats. + pub fn from_scalar( + column: impl Into, + value: ScalarValue, + num_containers: usize, + rows_per_container: u64, + ) -> Self { + let mins = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(num_containers), + ) + .expect("scalar to array"); + let maxs = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(num_containers), + ) + .expect("scalar to array"); + let null_counts = ScalarValue::iter_to_array( + std::iter::repeat(ScalarValue::UInt64(Some(0))).take(num_containers), + ) + .expect("zeros"); + let row_counts = ScalarValue::iter_to_array( + std::iter::repeat(ScalarValue::UInt64(Some(rows_per_container))) + .take(num_containers), + ) + .expect("rows"); + + Self::new(column, mins, maxs, null_counts, Some(row_counts)) + } +} + +impl PruningStatistics for MockPruningStatistics { + fn min_values(&self, column: &Column) -> Option { + (column.name == self.column).then(|| Arc::clone(&self.min_values)) + } + + fn max_values(&self, column: &Column) -> Option { + (column.name == self.column).then(|| Arc::clone(&self.max_values)) + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, column: &Column) -> Option { + (column.name == self.column).then(|| Arc::clone(&self.null_counts)) + } + + fn row_counts(&self, column: &Column) -> Option { + (column.name == self.column) + .then(|| self.row_counts.as_ref().map(Arc::clone)) + .flatten() + } + + fn contained(&self, _: &Column, _: &HashSet) -> Option { + None + } +} From 614ce3c8e19458b017100e8f687c370fcaf03731 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 23 Dec 2025 16:14:52 +0800 Subject: [PATCH 05/11] impl evaluate_pruning() for literal values --- .../physical-expr-common/src/physical_expr.rs | 1 - .../physical-expr/src/expressions/literal.rs | 12 +++++++++ datafusion/physical-expr/tests/pruning.rs | 25 ++++++++----------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 77d51c553477e..330ba90e264f9 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -604,7 +604,6 @@ impl RangeStats { /// Create range stats for a constant literal across all containers. /// pub fn new_scalar(value: ScalarValue, length: usize) -> Result { - ScalarValue::iter_to_array(std::iter::repeat(value.clone()).take(length))?; Ok(Self::Scalar { value, length }) } diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 1f3fefc60b7ad..f054c59e33d9c 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -34,6 +34,9 @@ use datafusion_expr::Expr; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties}; +use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, PruningContext, PruningIntermediate, RangeStats, +}; /// Represents a literal value #[derive(Debug, PartialEq, Eq, Clone)] @@ -112,6 +115,15 @@ impl PhysicalExpr for Literal { Ok(ColumnarValue::Scalar(self.value.clone())) } + fn evaluate_pruning(&self, ctx: Arc) -> Result { + let length = ctx.statistics().num_containers(); + let range = RangeStats::new_scalar(self.value.clone(), length)?; + Ok(PruningIntermediate::IntermediateStats(ColumnStats::new( + Some(range), + None, + ))) + } + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs index 8677806e2e541..51232f5c3864d 100644 --- a/datafusion/physical-expr/tests/pruning.rs +++ b/datafusion/physical-expr/tests/pruning.rs @@ -17,10 +17,9 @@ mod pruning_utils; -use std::collections::HashSet; use std::sync::Arc; -use arrow::array::{ArrayRef, Int32Array, UInt64Array}; +use arrow::array::{Int32Array, UInt64Array}; use datafusion_common::ScalarValue; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::{Column, lit}; @@ -94,21 +93,19 @@ fn column_pruning_uses_parquet_stats() { #[test] fn lit_basic() { - use datafusion_common::pruning::PruningStatistics; - let lit_expr = lit(5); let ctx = Arc::new(PruningContext::new(Arc::new(DummyStats))); - let stat = lit_expr.evaluate_pruning(ctx); + let stat = lit_expr.evaluate_pruning(ctx).expect("pruning ok"); - if let Ok(pruning_intermediate) = stat - && let PruningIntermediate::IntermediateStats(stat) = pruning_intermediate - && let ColumnStats { - range_stats, + match stat { + PruningIntermediate::IntermediateStats(ColumnStats { + range_stats: Some(RangeStats::Scalar { value, length }), null_stats, - } = stat - && let Some(range_stat) = range_stats - && let RangeStats::Scalar { value, length } = range_stat - { - // assert value is 5 + }) => { + assert_eq!(value, ScalarValue::Int32(Some(5))); + assert_eq!(length, 0); + assert!(null_stats.is_none()); + } + other => panic!("unexpected pruning result: {other:?}"), } } From 47628352b4aebbfccfb7a697076413083add7e08 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 24 Dec 2025 11:34:07 +0800 Subject: [PATCH 06/11] impl stat propagation for * operator --- .../physical-expr-common/src/physical_expr.rs | 24 -- .../physical-expr/src/expressions/binary.rs | 387 +++++++++++++++++- datafusion/physical-expr/tests/pruning.rs | 316 ++++++++++---- 3 files changed, 619 insertions(+), 108 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 330ba90e264f9..fc03695d8f593 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -1114,28 +1114,4 @@ mod test { &BooleanArray::from(vec![true; 5]), ); } - - #[test] - fn range_stats_scalar_variant() { - let stats = RangeStats::new_scalar(ScalarValue::Int64(Some(42)), 3).unwrap(); - assert_eq!(stats.len(), 3); - - let (mins, maxs) = match &stats { - RangeStats::Scalar { value, length } => { - let arr = ScalarValue::iter_to_array( - std::iter::repeat(value.clone()).take(*length), - ) - .unwrap(); - (arr.clone(), arr) - } - RangeStats::Values { mins, maxs, .. } => { - (mins.clone().expect("mins"), maxs.clone().expect("maxs")) - } - }; - - let mins = mins.as_any().downcast_ref::().unwrap(); - let maxs = maxs.as_any().downcast_ref::().unwrap(); - assert_eq!(mins, &Int64Array::from(vec![Some(42), Some(42), Some(42)])); - assert_eq!(maxs, &Int64Array::from(vec![Some(42), Some(42), Some(42)])); - } } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 8df09c22bbd8d..5dcd1e32668f9 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -29,7 +29,9 @@ use arrow::compute::{SlicesIterator, cast, filter_record_batch}; use arrow::datatypes::*; use arrow::error::ArrowError; use datafusion_common::cast::as_boolean_array; -use datafusion_common::{Result, ScalarValue, internal_err, not_impl_err}; +use datafusion_common::{ + Result, ScalarValue, assert_eq_or_internal_err, internal_err, not_impl_err, +}; use datafusion_expr::binary::BinaryTypeCoercer; use datafusion_expr::interval_arithmetic::{Interval, apply_operator}; use datafusion_expr::sort_properties::ExprProperties; @@ -40,6 +42,10 @@ use datafusion_expr::statistics::{ }; use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr_common::datum::{apply, apply_cmp}; +use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, PruningContext, PruningIntermediate, PruningResult, RangeStats, +}; +use std::cmp::Ordering; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, @@ -297,6 +303,141 @@ impl PhysicalExpr for BinaryExpr { .map(ColumnarValue::Array) } + fn evaluate_pruning(&self, ctx: Arc) -> Result { + use Operator::*; + let is_cmp = matches!(self.op, Eq | NotEq | Lt | LtEq | Gt | GtEq); + if !is_cmp { + return self.default_pruning(ctx); + } + + let left = self.left.evaluate_pruning(Arc::clone(&ctx))?; + let right = self.right.evaluate_pruning(ctx)?; + + let (left_stats, right_stats) = match (left, right) { + ( + PruningIntermediate::IntermediateStats(ls), + PruningIntermediate::IntermediateStats(rs), + ) => (ls, rs), + (other, PruningIntermediate::IntermediateStats(_)) + | (PruningIntermediate::IntermediateStats(_), other) => return Ok(other), + (l, r) => { + return Ok(if matches!(l, PruningIntermediate::IntermediateResult(_)) { + l + } else { + r + }); + } + }; + + let l_range = range_bounds(left_stats.range_stats.as_ref()); + let r_range = range_bounds(right_stats.range_stats.as_ref()); + + let (l_range, r_range) = match (l_range, r_range) { + (Some(l), Some(r)) if l.len() == r.len() && !l.is_empty() => (l, r), + (Some(l), Some(r)) if l.is_empty() && r.is_empty() => { + match ( + left_stats.range_stats.as_ref(), + right_stats.range_stats.as_ref(), + ) { + ( + Some(RangeStats::Scalar { value: lv, .. }), + Some(RangeStats::Scalar { value: rv, .. }), + ) => { + let res = compare_ranges( + &self.op, + &(Some(lv.clone()), Some(lv.clone())), + &(Some(rv.clone()), Some(rv.clone())), + ) + .unwrap_or(false); + let pruning = if res { + PruningResult::AlwaysTrue + } else { + PruningResult::AlwaysFalse + }; + return Ok(PruningIntermediate::IntermediateResult(pruning)); + } + _ => { + return Ok(PruningIntermediate::IntermediateResult( + PruningResult::Unknown, + )); + } + } + } + _ => { + return Ok(PruningIntermediate::IntermediateResult( + PruningResult::Unknown, + )); + } + }; + + let mut all_true = true; + let mut all_false = true; + + for (l, r) in l_range.into_iter().zip(r_range.into_iter()) { + let Some(result) = compare_ranges(&self.op, &l, &r) else { + all_true = false; + all_false = false; + break; + }; + if result { + all_false = false; + } else { + all_true = false; + } + } + + let res = if all_true { + PruningResult::AlwaysTrue + } else if all_false { + PruningResult::AlwaysFalse + } else { + PruningResult::Unknown + }; + Ok(PruningIntermediate::IntermediateResult(res)) + } + + fn propagate_range_stats( + &self, + child_range_stats: &[RangeStats], + ) -> Result> { + use Operator::Multiply; + + match self.op { + Multiply => match (&child_range_stats[0], &child_range_stats[1]) { + ( + RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + length, + }, + RangeStats::Scalar { value: scalar, .. }, + ) => multiply_range_by_scalar(mins, maxs, *length, scalar), + ( + RangeStats::Scalar { + value: scalar, + length, + }, + RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + .. + }, + ) => multiply_range_by_scalar(mins, maxs, *length, scalar), + _ => unimplemented!( + "Range propagation for * operator on children {:?} and {:?} has not been implemented yet", + child_range_stats[0], + child_range_stats[1] + ), + }, + _ => { + unimplemented!( + "Range propagation for {} has not been implemented", + self.op + ) + } + } + } + fn children(&self) -> Vec<&Arc> { vec![&self.left, &self.right] } @@ -746,6 +887,250 @@ fn check_short_circuit<'a>( ShortCircuitStrategy::None } +fn multiply_range_by_scalar( + mins: &ArrayRef, + maxs: &ArrayRef, + length: usize, + scalar: &ScalarValue, +) -> Result> { + let result_type = BinaryTypeCoercer::new( + mins.data_type(), + &Operator::Multiply, + &scalar.data_type(), + ) + .get_result_type()?; + + let mins = match cast(mins.as_ref(), &result_type) { + Ok(arr) => arr, + Err(_) => return Ok(None), + }; + let maxs = match cast(maxs.as_ref(), &result_type) { + Ok(arr) => arr, + Err(_) => return Ok(None), + }; + let scalar = scalar.cast_to(&result_type)?; + + let mut out_mins = Vec::with_capacity(length); + let mut out_maxs = Vec::with_capacity(length); + + for idx in 0..length { + let min = ScalarValue::try_from_array(mins.as_ref(), idx)?; + let max = ScalarValue::try_from_array(maxs.as_ref(), idx)?; + + let p1 = min.mul_checked(&scalar)?; + let p2 = max.mul_checked(&scalar)?; + + let (low, high) = match p1.partial_cmp(&p2) { + Some(Ordering::Greater) => (p2, p1), + Some(_) => (p1, p2), + None => return Ok(None), + }; + out_mins.push(low); + out_maxs.push(high); + } + + let mins = ScalarValue::iter_to_array(out_mins.into_iter())?; + let maxs = ScalarValue::iter_to_array(out_maxs.into_iter())?; + RangeStats::new(Some(mins), Some(maxs), length).map(Some) +} + +fn range_bounds( + stats: Option<&RangeStats>, +) -> Option, Option)>> { + match stats? { + RangeStats::Values { mins, maxs, length } => { + let mins = mins.as_ref()?; + let maxs = maxs.as_ref()?; + let mut out = Vec::with_capacity(*length); + for i in 0..*length { + let min = ScalarValue::try_from_array(mins, i).ok(); + let max = ScalarValue::try_from_array(maxs, i).ok(); + out.push((min, max)); + } + Some(out) + } + RangeStats::Scalar { value, length } => Some( + (0..*length) + .map(|_| (Some(value.clone()), Some(value.clone()))) + .collect(), + ), + } +} + +fn compare_ranges( + op: &Operator, + left: &(Option, Option), + right: &(Option, Option), +) -> Option { + use Operator::*; + let (lmin, lmax) = left; + let (rmin, rmax) = right; + + let ord_lmin_rmax = lmin + .as_ref() + .zip(rmax.as_ref()) + .and_then(|(l, r)| l.partial_cmp(r)); + let ord_lmax_rmin = lmax + .as_ref() + .zip(rmin.as_ref()) + .and_then(|(l, r)| l.partial_cmp(r)); + + match op { + Gt => { + if matches!(ord_lmin_rmax, Some(Ordering::Greater)) { + Some(true) + } else if matches!(ord_lmax_rmin, Some(Ordering::Less | Ordering::Equal)) { + Some(false) + } else { + None + } + } + GtEq => { + if matches!(ord_lmin_rmax, Some(Ordering::Greater | Ordering::Equal)) { + Some(true) + } else if matches!(ord_lmax_rmin, Some(Ordering::Less)) { + Some(false) + } else { + None + } + } + Lt => { + if matches!(ord_lmax_rmin, Some(Ordering::Less)) { + Some(true) + } else if matches!(ord_lmin_rmax, Some(Ordering::Greater | Ordering::Equal)) { + Some(false) + } else { + None + } + } + LtEq => { + if matches!(ord_lmax_rmin, Some(Ordering::Less | Ordering::Equal)) { + Some(true) + } else if matches!(ord_lmin_rmax, Some(Ordering::Greater)) { + Some(false) + } else { + None + } + } + Eq => { + if let (Some(lmin), Some(lmax), Some(rmin), Some(rmax)) = + (lmin.as_ref(), lmax.as_ref(), rmin.as_ref(), rmax.as_ref()) + { + if lmin == lmax && rmin == rmax && lmin == rmin { + return Some(true); + } + if matches!(lmax.partial_cmp(rmin), Some(Ordering::Less)) + || matches!(rmax.partial_cmp(lmin), Some(Ordering::Less)) + { + return Some(false); + } + } + None + } + NotEq => { + if let (Some(lmin), Some(lmax), Some(rmin), Some(rmax)) = + (lmin.as_ref(), lmax.as_ref(), rmin.as_ref(), rmax.as_ref()) + { + if lmin == lmax && rmin == rmax && lmin == rmin { + return Some(false); + } + if matches!(lmax.partial_cmp(rmin), Some(Ordering::Less)) + || matches!(rmax.partial_cmp(lmin), Some(Ordering::Less)) + { + return Some(true); + } + } + None + } + _ => None, + } +} + +impl BinaryExpr { + #[allow(dead_code)] + fn default_pruning(&self, ctx: Arc) -> Result { + let children = self.children(); + if children.is_empty() { + return Ok(PruningIntermediate::empty_stats()); + } + + let mut range_complete = true; + let mut null_complete = true; + let mut child_range_stats = Vec::with_capacity(children.len()); + let mut child_null_stats = Vec::with_capacity(children.len()); + + for child in children { + match child.evaluate_pruning(Arc::clone(&ctx))? { + PruningIntermediate::IntermediateStats(stats) => { + match stats.range_stats { + Some(range_stats) if range_complete => { + child_range_stats.push(range_stats); + } + _ => { + range_complete = false; + } + } + + match stats.null_stats { + Some(null_stats) if null_complete => { + child_null_stats.push(null_stats); + } + _ => { + null_complete = false; + } + } + } + other => return Ok(other), + } + } + + let range_stats = if range_complete && !child_range_stats.is_empty() { + if let Some((first, rest)) = child_range_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Range stats length mismatch between pruning children" + ); + } + } + self.propagate_range_stats(&child_range_stats)? + } else { + None + }; + + let null_stats = if null_complete && !child_null_stats.is_empty() { + if let Some((first, rest)) = child_null_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Null stats length mismatch between pruning children" + ); + } + } + self.propagate_null_stats(&child_null_stats)? + } else { + None + }; + + if let (Some(range_stats), Some(null_stats)) = + (range_stats.as_ref(), null_stats.as_ref()) + { + assert_eq_or_internal_err!( + range_stats.len(), + null_stats.len(), + "Range and null stats length mismatch for pruning" + ); + } + + Ok(PruningIntermediate::IntermediateStats(ColumnStats { + range_stats, + null_stats, + })) + } +} + /// Creates a new boolean array based on the evaluation of the right expression, /// but only for positions where the left_result is true. /// diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs index 51232f5c3864d..d9860226f2421 100644 --- a/datafusion/physical-expr/tests/pruning.rs +++ b/datafusion/physical-expr/tests/pruning.rs @@ -17,95 +17,245 @@ mod pruning_utils; -use std::sync::Arc; - -use arrow::array::{Int32Array, UInt64Array}; -use datafusion_common::ScalarValue; -use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr::expressions::{Column, lit}; -use datafusion_physical_expr_common::physical_expr::{ - ColumnStats, PruningContext, PruningIntermediate, RangeStats, -}; - -use crate::pruning_utils::{DummyStats, MockPruningStatistics}; - -#[test] -fn column_pruning_uses_parquet_stats() { - // Dummy stats: two containers with constant value 10 and 3 rows each. - let pruning_stats = Arc::new(MockPruningStatistics::from_scalar( - "a", - ScalarValue::Int32(Some(10)), - 2, - 3, - )); - - let context = Arc::new(PruningContext::new(pruning_stats)); - let column_expr = Column::new("a", 0); - - match column_expr.evaluate_pruning(context).unwrap() { - PruningIntermediate::IntermediateStats(stats) => { - let range_stats = stats.range_stats().expect("range stats"); - assert_eq!(range_stats.len(), 2); - - let (mins, maxs) = match range_stats { - RangeStats::Values { - mins: Some(mins), - maxs: Some(maxs), - .. - } => (mins.clone(), maxs.clone()), - RangeStats::Scalar { value, length } => { - let arr = ScalarValue::iter_to_array( - std::iter::repeat(value.clone()).take(*length), - ) +mod test { + use std::sync::Arc; + + use arrow::array::{Int32Array, Int64Array, UInt64Array}; + use arrow::datatypes::DataType; + use datafusion_common::ScalarValue; + use datafusion_expr::Operator; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit}; + use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, PruningContext, PruningIntermediate, PruningResult, RangeStats, + }; + + use crate::pruning_utils::{DummyStats, MockPruningStatistics}; + + #[test] + fn column_pruning_uses_parquet_stats() { + // Dummy stats: two containers with constant value 10 and 3 rows each. + let pruning_stats = Arc::new(MockPruningStatistics::from_scalar( + "a", + ScalarValue::Int32(Some(10)), + 2, + 3, + )); + + let context = Arc::new(PruningContext::new(pruning_stats)); + let column_expr = Column::new("a", 0); + + match column_expr.evaluate_pruning(context).unwrap() { + PruningIntermediate::IntermediateStats(stats) => { + let range_stats = stats.range_stats().expect("range stats"); + assert_eq!(range_stats.len(), 2); + + let (mins, maxs) = match range_stats { + RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + .. + } => (mins.clone(), maxs.clone()), + RangeStats::Scalar { value, length } => { + let arr = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(*length), + ) + .unwrap(); + (arr.clone(), arr) + } + _ => panic!("missing min/max stats"), + }; + + let mins = mins.as_any().downcast_ref::().unwrap(); + let maxs = maxs.as_any().downcast_ref::().unwrap(); + assert_eq!(mins, &Int32Array::from(vec![Some(10), Some(10)])); + assert_eq!(maxs, &Int32Array::from(vec![Some(10), Some(10)])); + + let null_stats = stats.null_stats().expect("null stats"); + assert_eq!(null_stats.len(), 2); + + let null_counts = null_stats + .null_counts() + .expect("null counts") + .as_any() + .downcast_ref::() .unwrap(); - (arr.clone(), arr) - } - _ => panic!("missing min/max stats"), - }; - - let mins = mins.as_any().downcast_ref::().unwrap(); - let maxs = maxs.as_any().downcast_ref::().unwrap(); - assert_eq!(mins, &Int32Array::from(vec![Some(10), Some(10)])); - assert_eq!(maxs, &Int32Array::from(vec![Some(10), Some(10)])); - - let null_stats = stats.null_stats().expect("null stats"); - assert_eq!(null_stats.len(), 2); - - let null_counts = null_stats - .null_counts() - .expect("null counts") - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(null_counts, &UInt64Array::from(vec![Some(0), Some(0)])); + assert_eq!(null_counts, &UInt64Array::from(vec![Some(0), Some(0)])); + + let row_counts = null_stats + .row_counts() + .expect("row counts") + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(row_counts, &UInt64Array::from(vec![Some(3), Some(3)])); + } + other => panic!("expected stats, got {other:?}"), + } + } + + #[test] + fn lit_basic() { + let lit_expr = lit(5); + let ctx = Arc::new(PruningContext::new(Arc::new(DummyStats))); + let stat = lit_expr.evaluate_pruning(ctx).expect("pruning ok"); + + match stat { + PruningIntermediate::IntermediateStats(ColumnStats { + range_stats: Some(RangeStats::Scalar { value, length }), + null_stats, + }) => { + assert_eq!(value, ScalarValue::Int32(Some(5))); + assert_eq!(length, 0); + assert!(null_stats.is_none()); + } + other => panic!("unexpected pruning result: {other:?}"), + } + } - let row_counts = null_stats - .row_counts() - .expect("row counts") - .as_any() - .downcast_ref::() + #[test] + fn range_stats_scalar_variant() { + let stats = RangeStats::new_scalar(ScalarValue::Int64(Some(42)), 3).unwrap(); + assert_eq!(stats.len(), 3); + + let (mins, maxs) = match &stats { + RangeStats::Scalar { value, length } => { + let arr = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(*length), + ) .unwrap(); - assert_eq!(row_counts, &UInt64Array::from(vec![Some(3), Some(3)])); + (arr.clone(), arr) + } + RangeStats::Values { mins, maxs, .. } => { + (mins.clone().expect("mins"), maxs.clone().expect("maxs")) + } + }; + + let mins = mins.as_any().downcast_ref::().unwrap(); + let maxs = maxs.as_any().downcast_ref::().unwrap(); + assert_eq!(mins, &Int64Array::from(vec![Some(42), Some(42), Some(42)])); + assert_eq!(maxs, &Int64Array::from(vec![Some(42), Some(42), Some(42)])); + } + + #[test] + fn compare_literal_literal_prunes() { + let expr = BinaryExpr::new(lit(5), Operator::Gt, lit(3)); + let ctx = Arc::new(PruningContext::new(Arc::new(DummyStats))); + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(PruningResult::AlwaysTrue) => {} + other => panic!("unexpected result: {other:?}"), } - other => panic!("expected stats, got {other:?}"), } -} -#[test] -fn lit_basic() { - let lit_expr = lit(5); - let ctx = Arc::new(PruningContext::new(Arc::new(DummyStats))); - let stat = lit_expr.evaluate_pruning(ctx).expect("pruning ok"); - - match stat { - PruningIntermediate::IntermediateStats(ColumnStats { - range_stats: Some(RangeStats::Scalar { value, length }), - null_stats, - }) => { - assert_eq!(value, ScalarValue::Int32(Some(5))); - assert_eq!(length, 0); - assert!(null_stats.is_none()); + #[test] + fn compare_column_literal_prunes() { + let stats = Arc::new(MockPruningStatistics::from_scalar( + "a", + ScalarValue::Int32(Some(10)), + 2, + 3, + )); + let ctx = Arc::new(PruningContext::new(stats)); + let expr = BinaryExpr::new(Arc::new(Column::new("a", 0)), Operator::Lt, lit(5)); + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(PruningResult::AlwaysFalse) => {} + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn compare_column_ranges_prunes() { + // All ranges strictly greater than the literal -> AlwaysTrue + let mins = Arc::new(Int32Array::from(vec![Some(10), Some(8)])); + let maxs = Arc::new(Int32Array::from(vec![Some(20), Some(12)])); + let zeros = Arc::new(UInt64Array::from(vec![Some(0), Some(0)])); + let rows = Arc::new(UInt64Array::from(vec![Some(3), Some(3)])); + let stats = Arc::new(MockPruningStatistics::new( + "a", + mins.clone(), + maxs.clone(), + zeros.clone(), + Some(rows.clone()), + )); + let ctx = Arc::new(PruningContext::new(stats)); + let expr = BinaryExpr::new(Arc::new(Column::new("a", 0)), Operator::Gt, lit(5)); + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(PruningResult::AlwaysTrue) => {} + other => panic!("unexpected result: {other:?}"), + } + + // Mixed range that overlaps the literal -> Unknown + let mins = Arc::new(Int32Array::from(vec![Some(1), Some(2)])); + let maxs = Arc::new(Int32Array::from(vec![Some(9), Some(4)])); + let stats = Arc::new(MockPruningStatistics::new( + "a", + mins, + maxs, + zeros, + Some(rows), + )); + let ctx = Arc::new(PruningContext::new(stats)); + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(PruningResult::Unknown) => {} + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn multiply_range_prunes() { + let mins = Arc::new(Int32Array::from(vec![Some(10)])); + let maxs = Arc::new(Int32Array::from(vec![Some(20)])); + let zeros = Arc::new(UInt64Array::from(vec![Some(0)])); + let rows = Arc::new(UInt64Array::from(vec![Some(1)])); + let stats = Arc::new(MockPruningStatistics::new( + "a", + mins, + maxs, + zeros, + Some(rows), + )); + let ctx = Arc::new(PruningContext::new(stats)); + + let mult_expr = + BinaryExpr::new(Arc::new(Column::new("a", 0)), Operator::Multiply, lit(7)); + let mult_stats = mult_expr + .evaluate_pruning(Arc::clone(&ctx)) + .expect("multiplication pruning ok"); + match mult_stats { + PruningIntermediate::IntermediateStats(ColumnStats { + range_stats: + Some(RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + length, + }), + null_stats, + }) => { + assert_eq!(length, 1); + let min_scalar = + ScalarValue::try_from_array(mins.as_ref(), 0).expect("min scalar"); + let max_scalar = + ScalarValue::try_from_array(maxs.as_ref(), 0).expect("max scalar"); + let min_scalar = min_scalar.cast_to(&DataType::Int64).unwrap(); + let max_scalar = max_scalar.cast_to(&DataType::Int64).unwrap(); + assert_eq!(min_scalar, ScalarValue::Int64(Some(70))); + assert_eq!(max_scalar, ScalarValue::Int64(Some(140))); + assert!(null_stats.is_none()); + } + other => panic!("unexpected pruning result: {other:?}"), + } + + // (a*7) < 10 + let expr = BinaryExpr::new(Arc::new(mult_expr), Operator::Lt, lit(10)); + + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(PruningResult::AlwaysFalse) => {} + other => panic!("unexpected result: {other:?}"), } - other => panic!("unexpected pruning result: {other:?}"), } } From ce79d9174527becbda661eb28f782d048aa9c938 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 24 Dec 2025 11:59:50 +0800 Subject: [PATCH 07/11] impl stat propagation for IS NULL --- .../physical-expr-common/src/physical_expr.rs | 3 +- .../physical-expr/src/expressions/binary.rs | 35 +++++------- .../physical-expr/src/expressions/is_null.rs | 53 +++++++++++++++++++ datafusion/physical-expr/tests/pruning.rs | 44 ++++++++++++--- 4 files changed, 105 insertions(+), 30 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index fc03695d8f593..edb369780504f 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -687,10 +687,11 @@ impl ColumnStats { } } +// TODO: should include length (container count) #[derive(Debug, Clone)] pub enum PruningIntermediate { IntermediateStats(ColumnStats), - IntermediateResult(PruningResult), + IntermediateResult(Vec), } impl PruningIntermediate { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 5dcd1e32668f9..f584d3eaa589b 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -354,46 +354,35 @@ impl PhysicalExpr for BinaryExpr { } else { PruningResult::AlwaysFalse }; - return Ok(PruningIntermediate::IntermediateResult(pruning)); + return Ok(PruningIntermediate::IntermediateResult(vec![ + pruning, + ])); } _ => { return Ok(PruningIntermediate::IntermediateResult( - PruningResult::Unknown, + vec![PruningResult::Unknown], )); } } } _ => { return Ok(PruningIntermediate::IntermediateResult( - PruningResult::Unknown, + vec![PruningResult::Unknown], )); } }; - let mut all_true = true; - let mut all_false = true; - + let mut results = Vec::with_capacity(l_range.len()); for (l, r) in l_range.into_iter().zip(r_range.into_iter()) { - let Some(result) = compare_ranges(&self.op, &l, &r) else { - all_true = false; - all_false = false; - break; + let res = match compare_ranges(&self.op, &l, &r) { + Some(true) => PruningResult::AlwaysTrue, + Some(false) => PruningResult::AlwaysFalse, + None => PruningResult::Unknown, }; - if result { - all_false = false; - } else { - all_true = false; - } + results.push(res); } - let res = if all_true { - PruningResult::AlwaysTrue - } else if all_false { - PruningResult::AlwaysFalse - } else { - PruningResult::Unknown - }; - Ok(PruningIntermediate::IntermediateResult(res)) + Ok(PruningIntermediate::IntermediateResult(results)) } fn propagate_range_stats( diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 356fe2a866672..d5a38d3276e9f 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -18,6 +18,7 @@ //! IS NULL expression use crate::PhysicalExpr; +use arrow::array::{Array, UInt64Array}; use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, @@ -25,6 +26,9 @@ use arrow::{ use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::{ + PruningContext, PruningIntermediate, PruningResult, +}; use std::hash::Hash; use std::{any::Any, sync::Arc}; @@ -92,6 +96,55 @@ impl PhysicalExpr for IsNullExpr { } } + fn evaluate_pruning(&self, ctx: Arc) -> Result { + use datafusion_physical_expr_common::physical_expr::PruningResult::*; + + let child = self.arg.evaluate_pruning(ctx)?; + match child { + PruningIntermediate::IntermediateStats(stats) => { + if let Some(null_stats) = stats.null_stats() { + if let (Some(null_counts), Some(row_counts)) = + (null_stats.null_counts(), null_stats.row_counts()) + { + if let (Some(null_counts), Some(row_counts)) = ( + null_counts.as_any().downcast_ref::(), + row_counts.as_any().downcast_ref::(), + ) { + let len = null_counts.len(); + if len == row_counts.len() { + let mut results = Vec::with_capacity(len); + for i in 0..len { + let res = if null_counts.is_null(i) + || row_counts.is_null(i) + { + Unknown + } else { + let n = null_counts.value(i); + let r = row_counts.value(i); + if n == 0 { + AlwaysFalse + } else if n == r { + AlwaysTrue + } else { + Unknown + } + }; + results.push(res); + } + + return Ok(PruningIntermediate::IntermediateResult( + results, + )); + } + } + } + } + Ok(PruningIntermediate::IntermediateResult(vec![Unknown])) + } + other => Ok(other), + } + } + fn children(&self) -> Vec<&Arc> { vec![&self.arg] } diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs index d9860226f2421..6f13d51b5f1f3 100644 --- a/datafusion/physical-expr/tests/pruning.rs +++ b/datafusion/physical-expr/tests/pruning.rs @@ -25,7 +25,7 @@ mod test { use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::PhysicalExpr; - use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit}; + use datafusion_physical_expr::expressions::{BinaryExpr, Column, is_null, lit}; use datafusion_physical_expr_common::physical_expr::{ ColumnStats, PruningContext, PruningIntermediate, PruningResult, RangeStats, }; @@ -143,7 +143,9 @@ mod test { let ctx = Arc::new(PruningContext::new(Arc::new(DummyStats))); let res = expr.evaluate_pruning(ctx).unwrap(); match res { - PruningIntermediate::IntermediateResult(PruningResult::AlwaysTrue) => {} + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::AlwaysTrue]) + } other => panic!("unexpected result: {other:?}"), } } @@ -160,7 +162,9 @@ mod test { let expr = BinaryExpr::new(Arc::new(Column::new("a", 0)), Operator::Lt, lit(5)); let res = expr.evaluate_pruning(ctx).unwrap(); match res { - PruningIntermediate::IntermediateResult(PruningResult::AlwaysFalse) => {} + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::AlwaysFalse; 2]) + } other => panic!("unexpected result: {other:?}"), } } @@ -183,7 +187,9 @@ mod test { let expr = BinaryExpr::new(Arc::new(Column::new("a", 0)), Operator::Gt, lit(5)); let res = expr.evaluate_pruning(ctx).unwrap(); match res { - PruningIntermediate::IntermediateResult(PruningResult::AlwaysTrue) => {} + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::AlwaysTrue; 2]) + } other => panic!("unexpected result: {other:?}"), } @@ -200,7 +206,12 @@ mod test { let ctx = Arc::new(PruningContext::new(stats)); let res = expr.evaluate_pruning(ctx).unwrap(); match res { - PruningIntermediate::IntermediateResult(PruningResult::Unknown) => {} + PruningIntermediate::IntermediateResult(results) => { + assert_eq!( + results, + vec![PruningResult::Unknown, PruningResult::AlwaysFalse] + ) + } other => panic!("unexpected result: {other:?}"), } } @@ -254,7 +265,28 @@ mod test { let res = expr.evaluate_pruning(ctx).unwrap(); match res { - PruningIntermediate::IntermediateResult(PruningResult::AlwaysFalse) => {} + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::AlwaysFalse]) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn is_null_prunes_when_no_nulls() { + let stats = Arc::new(MockPruningStatistics::from_scalar( + "b", + ScalarValue::Int32(Some(1)), + 1, + 3, + )); + let ctx = Arc::new(PruningContext::new(stats)); + let expr = is_null(Arc::new(Column::new("b", 0))).unwrap(); + + match expr.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::AlwaysFalse]) + } other => panic!("unexpected result: {other:?}"), } } From fd0617594e48f614484840852237cceda2b0f2dc Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 24 Dec 2025 13:14:29 +0800 Subject: [PATCH 08/11] impl InList --- datafusion/common/src/pruning.rs | 15 ++ .../physical-expr-common/src/physical_expr.rs | 40 +++++ .../physical-expr/src/expressions/binary.rs | 13 +- .../physical-expr/src/expressions/column.rs | 14 +- .../physical-expr/src/expressions/in_list.rs | 82 +++++++++ .../physical-expr/src/expressions/is_null.rs | 2 +- .../physical-expr/src/expressions/literal.rs | 19 +- datafusion/physical-expr/tests/pruning.rs | 167 +++++++++++++++++- .../physical-expr/tests/pruning_utils.rs | 35 ++++ 9 files changed, 367 insertions(+), 20 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 5a7598ea1f299..2540058a3c80b 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -36,6 +36,8 @@ use crate::{ColumnStatistics, ScalarValue}; /// /// 3. Whether the values in a column are contained in a set of literals /// +/// 4. Known value sets for columns, on a per-container basis +/// /// # Vectorized Interface /// /// Information for containers / files are returned as Arrow [`ArrayRef`], so @@ -105,6 +107,19 @@ pub trait PruningStatistics { /// [`UInt64Array`]: arrow::array::UInt64Array fn row_counts(&self, column: &Column) -> Option; + /// Return the known set of values for the named column in each container. + /// + /// The returned vector must contain [`Self::num_containers`] entries. + /// Each entry corresponds to a container and can be: + /// * `Some(HashSet)` if the values for that container are known + /// * `None` if the set is unknown for that container. + /// + /// If set statistics are not available for any container, return `None` + /// (the default). + fn value_sets(&self, _column: &Column) -> Option>>> { + None + } + /// Returns [`BooleanArray`] where each row represents information known /// about specific literal `values` in a column. /// diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index edb369780504f..a7b86a3816bdc 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashSet; use std::fmt; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; @@ -534,6 +535,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { Ok(PruningIntermediate::IntermediateStats(ColumnStats { range_stats, null_stats, + set_stats: None, })) } } @@ -572,10 +574,16 @@ pub struct NullStats { length: usize, } +#[derive(Debug, Clone)] +pub struct SetStats { + sets: Vec>>, +} + #[derive(Debug, Clone)] pub struct ColumnStats { pub range_stats: Option, pub null_stats: Option, + pub set_stats: Option, } impl RangeStats { @@ -670,11 +678,39 @@ impl NullStats { } } +impl SetStats { + pub fn new(sets: Vec>>, length: usize) -> Result { + assert_eq_or_internal_err!( + sets.len(), + length, + "Set stats length mismatch for pruning statistics" + ); + Ok(Self { sets }) + } + + pub fn len(&self) -> usize { + self.sets.len() + } + + pub fn value_sets(&self) -> &[Option>] { + &self.sets + } +} + impl ColumnStats { pub fn new(range_stats: Option, null_stats: Option) -> Self { + Self::new_with_set_stats(range_stats, null_stats, None) + } + + pub fn new_with_set_stats( + range_stats: Option, + null_stats: Option, + set_stats: Option, + ) -> Self { Self { range_stats, null_stats, + set_stats, } } @@ -685,6 +721,10 @@ impl ColumnStats { pub fn null_stats(&self) -> Option<&NullStats> { self.null_stats.as_ref() } + + pub fn set_stats(&self) -> Option<&SetStats> { + self.set_stats.as_ref() + } } // TODO: should include length (container count) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index f584d3eaa589b..e768e6cac8f36 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -359,16 +359,16 @@ impl PhysicalExpr for BinaryExpr { ])); } _ => { - return Ok(PruningIntermediate::IntermediateResult( - vec![PruningResult::Unknown], - )); + return Ok(PruningIntermediate::IntermediateResult(vec![ + PruningResult::Unknown, + ])); } } } _ => { - return Ok(PruningIntermediate::IntermediateResult( - vec![PruningResult::Unknown], - )); + return Ok(PruningIntermediate::IntermediateResult(vec![ + PruningResult::Unknown, + ])); } }; @@ -1116,6 +1116,7 @@ impl BinaryExpr { Ok(PruningIntermediate::IntermediateStats(ColumnStats { range_stats, null_stats, + set_stats: None, })) } } diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 6ec3b48a27996..d29c52c611130 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -31,7 +31,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{Column as DFColumn, Result, internal_err, plan_err}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{ - ColumnStats, NullStats, PruningContext, PruningIntermediate, RangeStats, + ColumnStats, NullStats, PruningContext, PruningIntermediate, RangeStats, SetStats, }; /// Represents the column at a given index in a RecordBatch @@ -169,10 +169,14 @@ impl PhysicalExpr for Column { None }; - Ok(PruningIntermediate::IntermediateStats(ColumnStats::new( - range_stats, - null_stats, - ))) + let set_stats = stats + .value_sets(&pruning_column) + .map(|sets| SetStats::new(sets, num_containers)) + .transpose()?; + + Ok(PruningIntermediate::IntermediateStats( + ColumnStats::new_with_set_stats(range_stats, null_stats, set_stats), + )) } fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index b6b67c85c4881..33ea1c7fbead6 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -18,6 +18,7 @@ //! Implementation of `InList` expressions: [`InListExpr`] use std::any::Any; +use std::collections::HashSet as StdHashSet; use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -37,6 +38,9 @@ use datafusion_common::{ exec_err, }; use datafusion_expr::{ColumnarValue, expr_vec_fmt}; +use datafusion_physical_expr_common::physical_expr::{ + PruningContext, PruningIntermediate, PruningResult, RangeStats, +}; use ahash::RandomState; use datafusion_common::HashMap; @@ -682,6 +686,29 @@ impl InListExpr { Ok(Self::new(expr, list, negated, static_filter)) } + + fn list_value_set( + &self, + ctx: &Arc, + ) -> Result>> { + let mut values = StdHashSet::new(); + for expr in &self.list { + match expr.evaluate_pruning(Arc::clone(ctx))? { + PruningIntermediate::IntermediateStats(stats) => { + match stats.range_stats { + Some(RangeStats::Scalar { value, .. }) => { + if !value.is_null() { + values.insert(value); + } + } + _ => return Ok(None), + } + } + _ => return Ok(None), + } + } + Ok(Some(values)) + } } impl std::fmt::Display for InListExpr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -828,6 +855,61 @@ impl PhysicalExpr for InListExpr { Ok(ColumnarValue::Array(Arc::new(r))) } + fn evaluate_pruning(&self, ctx: Arc) -> Result { + let left = self.expr.evaluate_pruning(Arc::clone(&ctx))?; + let stats = match left { + PruningIntermediate::IntermediateStats(stats) => stats, + _ => unreachable!("Input must have IntermediateStats type"), + }; + + let container_len = stats + .set_stats() + .map(|s| s.len()) + .or_else(|| stats.range_stats().map(|s| s.len())) + .or_else(|| stats.null_stats().map(|s| s.len())) + .unwrap_or(1); + + let Some(list_values) = self.list_value_set(&ctx)? else { + return Ok(PruningIntermediate::IntermediateResult(vec![ + PruningResult::Unknown; + container_len + ])); + }; + + let Some(set_stats) = stats.set_stats() else { + return Ok(PruningIntermediate::IntermediateResult(vec![ + PruningResult::Unknown; + container_len + ])); + }; + + let mut results = Vec::with_capacity(set_stats.len()); + for container_values in set_stats.value_sets() { + let res = match container_values { + None => PruningResult::Unknown, + Some(values) => match values.is_empty() + || values.iter().any(|v| v.is_null()) + { + true => PruningResult::Unknown, + false => { + let is_subset = values.iter().all(|v| list_values.contains(v)); + let is_disjoint = values.iter().all(|v| !list_values.contains(v)); + match (is_subset, is_disjoint, self.negated) { + (true, _, false) => PruningResult::AlwaysTrue, + (true, _, true) => PruningResult::AlwaysFalse, + (_, true, false) => PruningResult::AlwaysFalse, + (_, true, true) => PruningResult::AlwaysTrue, + _ => PruningResult::Unknown, + } + } + }, + }; + results.push(res); + } + + Ok(PruningIntermediate::IntermediateResult(results)) + } + fn children(&self) -> Vec<&Arc> { let mut children = vec![&self.expr]; children.extend(&self.list); diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index d5a38d3276e9f..a66740c116095 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -27,7 +27,7 @@ use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{ - PruningContext, PruningIntermediate, PruningResult, + PruningContext, PruningIntermediate, }; use std::hash::Hash; use std::{any::Any, sync::Arc}; diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index f054c59e33d9c..49b12d411b8c0 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -18,6 +18,7 @@ //! Literal expressions for physical operations use std::any::Any; +use std::collections::HashSet; use std::hash::Hash; use std::sync::Arc; @@ -35,7 +36,7 @@ use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties}; use datafusion_physical_expr_common::physical_expr::{ - ColumnStats, PruningContext, PruningIntermediate, RangeStats, + ColumnStats, PruningContext, PruningIntermediate, RangeStats, SetStats, }; /// Represents a literal value @@ -118,10 +119,18 @@ impl PhysicalExpr for Literal { fn evaluate_pruning(&self, ctx: Arc) -> Result { let length = ctx.statistics().num_containers(); let range = RangeStats::new_scalar(self.value.clone(), length)?; - Ok(PruningIntermediate::IntermediateStats(ColumnStats::new( - Some(range), - None, - ))) + let set_stats = if self.value.is_null() { + None + } else { + let mut set = HashSet::new(); + set.insert(self.value.clone()); + let sets = vec![Some(set); length]; + Some(SetStats::new(sets, length)?) + }; + + Ok(PruningIntermediate::IntermediateStats( + ColumnStats::new_with_set_stats(Some(range), None, set_stats), + )) } fn children(&self) -> Vec<&Arc> { diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs index 6f13d51b5f1f3..5a68fae9dd8d7 100644 --- a/datafusion/physical-expr/tests/pruning.rs +++ b/datafusion/physical-expr/tests/pruning.rs @@ -18,14 +18,18 @@ mod pruning_utils; mod test { + use std::collections::HashSet; use std::sync::Arc; - use arrow::array::{Int32Array, Int64Array, UInt64Array}; - use arrow::datatypes::DataType; + use arrow::array::{ArrayRef, BooleanArray, Int32Array, Int64Array, UInt64Array}; + use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use datafusion_common::ScalarValue; + use datafusion_common::pruning::PruningStatistics; use datafusion_expr::Operator; use datafusion_physical_expr::PhysicalExpr; - use datafusion_physical_expr::expressions::{BinaryExpr, Column, is_null, lit}; + use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, in_list, is_null, lit, + }; use datafusion_physical_expr_common::physical_expr::{ ColumnStats, PruningContext, PruningIntermediate, PruningResult, RangeStats, }; @@ -94,6 +98,81 @@ mod test { } } + #[test] + fn column_set_stats_present() { + let mins = + ScalarValue::iter_to_array(vec![ScalarValue::Utf8(Some("foo".to_string()))]) + .unwrap(); + let maxs = + ScalarValue::iter_to_array(vec![ScalarValue::Utf8(Some("foo".to_string()))]) + .unwrap(); + let null_counts = + ScalarValue::iter_to_array(vec![ScalarValue::UInt64(Some(0))]).unwrap(); + let row_counts = + ScalarValue::iter_to_array(vec![ScalarValue::UInt64(Some(1))]).unwrap(); + + let mut set = HashSet::new(); + set.insert(ScalarValue::Utf8(Some("foo".to_string()))); + set.insert(ScalarValue::Utf8(Some("bar".to_string()))); + + let pruning_stats = Arc::new(MockPruningStatistics::new_with_sets( + "c", + mins, + maxs, + null_counts, + Some(row_counts), + Some(vec![Some(set.clone())]), + )); + + let context = Arc::new(PruningContext::new(pruning_stats)); + let column_expr = Column::new("c", 0); + + match column_expr.evaluate_pruning(context).unwrap() { + PruningIntermediate::IntermediateStats(stats) => { + let set_stats = stats.set_stats().expect("set stats"); + assert_eq!(set_stats.len(), 1); + let container_set = set_stats.value_sets()[0] + .as_ref() + .expect("value set present"); + assert_eq!(container_set, &set); + } + other => panic!("expected stats, got {other:?}"), + } + } + + #[derive(Clone)] + struct LenOnlyStats(usize); + + impl PruningStatistics for LenOnlyStats { + fn min_values(&self, _: &datafusion_common::Column) -> Option { + None + } + + fn max_values(&self, _: &datafusion_common::Column) -> Option { + None + } + + fn num_containers(&self) -> usize { + self.0 + } + + fn null_counts(&self, _: &datafusion_common::Column) -> Option { + None + } + + fn row_counts(&self, _: &datafusion_common::Column) -> Option { + None + } + + fn contained( + &self, + _: &datafusion_common::Column, + _: &HashSet, + ) -> Option { + None + } + } + #[test] fn lit_basic() { let lit_expr = lit(5); @@ -104,10 +183,33 @@ mod test { PruningIntermediate::IntermediateStats(ColumnStats { range_stats: Some(RangeStats::Scalar { value, length }), null_stats, + set_stats, }) => { assert_eq!(value, ScalarValue::Int32(Some(5))); assert_eq!(length, 0); assert!(null_stats.is_none()); + let set_stats = set_stats.expect("set stats"); + assert_eq!(set_stats.len(), 0); + } + other => panic!("unexpected pruning result: {other:?}"), + } + } + + #[test] + fn literal_set_stats_present() { + let lit_expr = lit("foo"); + let ctx = Arc::new(PruningContext::new(Arc::new(LenOnlyStats(2)))); + let stat = lit_expr.evaluate_pruning(ctx).expect("pruning ok"); + + match stat { + PruningIntermediate::IntermediateStats(ColumnStats { set_stats, .. }) => { + let set_stats = set_stats.expect("set stats"); + assert_eq!(set_stats.len(), 2); + for values in set_stats.value_sets() { + let set = values.as_ref().expect("value set"); + assert_eq!(set.len(), 1); + assert!(set.contains(&ScalarValue::Utf8(Some("foo".to_string())))); + } } other => panic!("unexpected pruning result: {other:?}"), } @@ -245,6 +347,7 @@ mod test { length, }), null_stats, + set_stats, }) => { assert_eq!(length, 1); let min_scalar = @@ -256,6 +359,7 @@ mod test { assert_eq!(min_scalar, ScalarValue::Int64(Some(70))); assert_eq!(max_scalar, ScalarValue::Int64(Some(140))); assert!(null_stats.is_none()); + assert!(set_stats.is_none()); } other => panic!("unexpected pruning result: {other:?}"), } @@ -290,4 +394,61 @@ mod test { other => panic!("unexpected result: {other:?}"), } } + + #[test] + fn in_list_literal_set_prunes() { + let ctx = Arc::new(PruningContext::new(Arc::new(LenOnlyStats(1)))); + let schema = Schema::new(Vec::::new()); + let expr = + in_list(lit("foo"), vec![lit("foo"), lit("bar")], &false, &schema).unwrap(); + + match expr.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::AlwaysTrue]) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn in_list_prunes_with_set_stats() { + let mins = + ScalarValue::iter_to_array(vec![ScalarValue::Utf8(Some("foo".to_string()))]) + .unwrap(); + let maxs = + ScalarValue::iter_to_array(vec![ScalarValue::Utf8(Some("foo".to_string()))]) + .unwrap(); + let null_counts = + ScalarValue::iter_to_array(vec![ScalarValue::UInt64(Some(0))]).unwrap(); + + let mut value_set = HashSet::new(); + value_set.insert(ScalarValue::Utf8(Some("foo".to_string()))); + value_set.insert(ScalarValue::Utf8(Some("bar".to_string()))); + + let stats = Arc::new(MockPruningStatistics::new_with_sets( + "c", + mins, + maxs, + null_counts, + None, + Some(vec![Some(value_set)]), + )); + + let ctx = Arc::new(PruningContext::new(stats)); + let schema = Schema::new(vec![Field::new("c", DataType::Utf8, true)]); + let expr = in_list( + Arc::new(Column::new("c", 0)), + vec![lit("toy"), lit("book")], + &false, + &schema, + ) + .unwrap(); + + match expr.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::AlwaysFalse]) + } + other => panic!("unexpected result: {other:?}"), + } + } } diff --git a/datafusion/physical-expr/tests/pruning_utils.rs b/datafusion/physical-expr/tests/pruning_utils.rs index 67144ca3eb9ed..e1ff7d16b9392 100644 --- a/datafusion/physical-expr/tests/pruning_utils.rs +++ b/datafusion/physical-expr/tests/pruning_utils.rs @@ -55,6 +55,7 @@ pub struct MockPruningStatistics { null_counts: ArrayRef, row_counts: Option, num_containers: usize, + value_sets: Option>>>, } impl MockPruningStatistics { @@ -64,8 +65,33 @@ impl MockPruningStatistics { max_values: ArrayRef, null_counts: ArrayRef, row_counts: Option, + ) -> Self { + Self::new_with_sets( + column, + min_values, + max_values, + null_counts, + row_counts, + None, + ) + } + + pub fn new_with_sets( + column: impl Into, + min_values: ArrayRef, + max_values: ArrayRef, + null_counts: ArrayRef, + row_counts: Option, + value_sets: Option>>>, ) -> Self { let num_containers = min_values.len(); + if let Some(value_sets) = value_sets.as_ref() { + assert_eq!( + value_sets.len(), + num_containers, + "value sets must match container count" + ); + } Self { column: column.into(), min_values, @@ -73,6 +99,7 @@ impl MockPruningStatistics { null_counts, row_counts, num_containers, + value_sets, } } @@ -128,6 +155,14 @@ impl PruningStatistics for MockPruningStatistics { .flatten() } + fn value_sets(&self, column: &Column) -> Option>>> { + if column.name == self.column { + self.value_sets.clone() + } else { + None + } + } + fn contained(&self, _: &Column, _: &HashSet) -> Option { None } From ffce42ac5acaf23fc685f96e1ef9b55c0557af6d Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 24 Dec 2025 16:00:30 +0800 Subject: [PATCH 09/11] impl upper() udf set stat propagation --- datafusion/expr/src/async_udf.rs | 7 ++ datafusion/expr/src/lib.rs | 1 + datafusion/expr/src/udf.rs | 28 ++++++- datafusion/functions/src/string/upper.rs | 51 +++++++++++- .../physical-expr-common/src/physical_expr.rs | 55 ++++++++++++- .../physical-expr/src/expressions/binary.rs | 48 +++++++++++- .../physical-expr/src/scalar_function.rs | 8 ++ datafusion/physical-expr/tests/pruning.rs | 77 +++++++++++++++++++ 8 files changed, 269 insertions(+), 6 deletions(-) diff --git a/datafusion/expr/src/async_udf.rs b/datafusion/expr/src/async_udf.rs index 8afdfda68dea0..be9df0d57bee9 100644 --- a/datafusion/expr/src/async_udf.rs +++ b/datafusion/expr/src/async_udf.rs @@ -125,6 +125,13 @@ impl ScalarUDFImpl for AsyncScalarUDF { fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { internal_err!("async functions should not be called directly") } + + fn propagate_set_stats( + &self, + child_set_stats: &[crate::SetStats], + ) -> Result> { + self.inner.propagate_set_stats(child_set_stats) + } } impl Display for AsyncScalarUDF { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index b02254bb7d486..aa0bdb08afc1b 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -96,6 +96,7 @@ pub use datafusion_expr_common::signature::{ TIMEZONE_WILDCARD, TypeSignature, TypeSignatureClass, Volatility, }; pub use datafusion_expr_common::type_coercion::binary; +pub use datafusion_physical_expr_common::physical_expr::SetStats; pub use expr::{ Between, BinaryExpr, Case, Cast, Expr, GetFieldAccess, GroupingSet, Like, Sort as SortExpr, TryCast, WindowFunctionDefinition, diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 26d7fc99cb17c..8e4f3b0036aa1 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -22,7 +22,7 @@ use crate::expr::schema_name_from_exprs_comma_separated_without_space; use crate::simplify::{ExprSimplifyResult, SimplifyInfo}; use crate::sort_properties::{ExprProperties, SortProperties}; use crate::udf_eq::UdfEq; -use crate::{ColumnarValue, Documentation, Expr, Signature}; +use crate::{ColumnarValue, Documentation, Expr, SetStats, Signature}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::ConfigOptions; use datafusion_common::{ @@ -321,6 +321,14 @@ impl ScalarUDF { self.inner.propagate_constraints(interval, inputs) } + /// Propagate set statistics through this function. + pub fn propagate_set_stats( + &self, + child_set_stats: &[SetStats], + ) -> Result> { + self.inner.propagate_set_stats(child_set_stats) + } + /// Calculates the [`SortProperties`] of this function based on its /// children's properties. pub fn output_ordering(&self, inputs: &[ExprProperties]) -> Result { @@ -785,6 +793,17 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { Ok(Some(vec![])) } + /// Propagate set statistics from children through this function. + /// + /// Returns `Ok(None)` by default, indicating the function does not support + /// set-stat based pruning. + fn propagate_set_stats( + &self, + _child_set_stats: &[SetStats], + ) -> Result> { + Ok(None) + } + /// Calculates the [`SortProperties`] of this function based on its children's properties. fn output_ordering(&self, inputs: &[ExprProperties]) -> Result { if !self.preserves_lex_ordering(inputs)? { @@ -949,6 +968,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.propagate_constraints(interval, inputs) } + fn propagate_set_stats( + &self, + child_set_stats: &[SetStats], + ) -> Result> { + self.inner.propagate_set_stats(child_set_stats) + } + fn output_ordering(&self, inputs: &[ExprProperties]) -> Result { self.inner.output_ordering(inputs) } diff --git a/datafusion/functions/src/string/upper.rs b/datafusion/functions/src/string/upper.rs index a2a7db1848f59..819bfaf400510 100644 --- a/datafusion/functions/src/string/upper.rs +++ b/datafusion/functions/src/string/upper.rs @@ -18,14 +18,15 @@ use crate::string::common::to_upper; use crate::utils::utf8_to_str_type; use arrow::datatypes::DataType; -use datafusion_common::Result; use datafusion_common::types::logical_string; +use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ - Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, - TypeSignatureClass, Volatility, + Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, SetStats, + Signature, TypeSignatureClass, Volatility, }; use datafusion_macros::user_doc; use std::any::Any; +use std::collections::HashSet; #[user_doc( doc_section(label = "String Functions"), @@ -88,6 +89,50 @@ impl ScalarUDFImpl for UpperFunc { to_upper(&args.args, "upper") } + /// For input stat, capitalize the set statistics. + /// + /// Example: + /// Input expression `c` has set stat `{'foo', 'bar'}` + /// This function `upper(c)` will propagate the set stat to `{'FOO', 'BAR'}` + fn propagate_set_stats( + &self, + child_set_stats: &[SetStats], + ) -> Result> { + if child_set_stats.len() != 1 { + return Ok(None); + } + + let input_sets = &child_set_stats[0]; + let mut upper_sets = Vec::with_capacity(input_sets.len()); + + for values in input_sets.value_sets() { + let Some(values) = values else { + upper_sets.push(None); + continue; + }; + + let mut upper_values = HashSet::with_capacity(values.len()); + for value in values { + let upper_value = match value { + ScalarValue::Utf8(v) => { + ScalarValue::Utf8(v.as_ref().map(|s| s.to_uppercase())) + } + ScalarValue::LargeUtf8(v) => { + ScalarValue::LargeUtf8(v.as_ref().map(|s| s.to_uppercase())) + } + ScalarValue::Utf8View(v) => { + ScalarValue::Utf8View(v.as_ref().map(|s| s.to_uppercase())) + } + _ => return Ok(None), + }; + upper_values.insert(upper_value); + } + upper_sets.push(Some(upper_values)); + } + + SetStats::new(upper_sets, input_sets.len()).map(Some) + } + fn documentation(&self) -> Option<&Documentation> { self.doc() } diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index a7b86a3816bdc..42c3cf8f2b63d 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -451,6 +451,13 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { Ok(None) } + fn propagate_set_stats( + &self, + _child_set_stats: &[SetStats], + ) -> Result> { + Ok(None) + } + fn evaluate_pruning(&self, ctx: Arc) -> Result { // Default impl for stats-propagation nodes (e.g. arithmetic expressions): // 1) Evaluate pruning for all children. @@ -463,8 +470,10 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { let mut range_complete = true; let mut null_complete = true; + let mut set_complete = true; let mut child_range_stats = Vec::with_capacity(children.len()); let mut child_null_stats = Vec::with_capacity(children.len()); + let mut child_set_stats = Vec::with_capacity(children.len()); for child in children { match child.evaluate_pruning(Arc::clone(&ctx))? { @@ -486,6 +495,15 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { null_complete = false; } } + + match stats.set_stats { + Some(set_stats) if set_complete => { + child_set_stats.push(set_stats); + } + _ => { + set_complete = false; + } + } } // Without node-specific semantics, we can't combine a final pruning result here. other => return Ok(other), @@ -522,6 +540,21 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { None }; + let set_stats = if set_complete && !child_set_stats.is_empty() { + if let Some((first, rest)) = child_set_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Set stats length mismatch between pruning children" + ); + } + } + self.propagate_set_stats(&child_set_stats)? + } else { + None + }; + if let (Some(range_stats), Some(null_stats)) = (range_stats.as_ref(), null_stats.as_ref()) { @@ -532,10 +565,30 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { ); } + if let (Some(range_stats), Some(set_stats)) = + (range_stats.as_ref(), set_stats.as_ref()) + { + assert_eq_or_internal_err!( + range_stats.len(), + set_stats.len(), + "Range and set stats length mismatch for pruning" + ); + } + + if let (Some(null_stats), Some(set_stats)) = + (null_stats.as_ref(), set_stats.as_ref()) + { + assert_eq_or_internal_err!( + null_stats.length, + set_stats.len(), + "Null and set stats length mismatch for pruning" + ); + } + Ok(PruningIntermediate::IntermediateStats(ColumnStats { range_stats, null_stats, - set_stats: None, + set_stats, })) } } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index e768e6cac8f36..a8e4204a9336c 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -1045,8 +1045,10 @@ impl BinaryExpr { let mut range_complete = true; let mut null_complete = true; + let mut set_complete = true; let mut child_range_stats = Vec::with_capacity(children.len()); let mut child_null_stats = Vec::with_capacity(children.len()); + let mut child_set_stats = Vec::with_capacity(children.len()); for child in children { match child.evaluate_pruning(Arc::clone(&ctx))? { @@ -1068,6 +1070,15 @@ impl BinaryExpr { null_complete = false; } } + + match stats.set_stats { + Some(set_stats) if set_complete => { + child_set_stats.push(set_stats); + } + _ => { + set_complete = false; + } + } } other => return Ok(other), } @@ -1103,6 +1114,21 @@ impl BinaryExpr { None }; + let set_stats = if set_complete && !child_set_stats.is_empty() { + if let Some((first, rest)) = child_set_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Set stats length mismatch between pruning children" + ); + } + } + self.propagate_set_stats(&child_set_stats)? + } else { + None + }; + if let (Some(range_stats), Some(null_stats)) = (range_stats.as_ref(), null_stats.as_ref()) { @@ -1113,10 +1139,30 @@ impl BinaryExpr { ); } + if let (Some(range_stats), Some(set_stats)) = + (range_stats.as_ref(), set_stats.as_ref()) + { + assert_eq_or_internal_err!( + range_stats.len(), + set_stats.len(), + "Range and set stats length mismatch for pruning" + ); + } + + if let (Some(null_stats), Some(set_stats)) = + (null_stats.as_ref(), set_stats.as_ref()) + { + assert_eq_or_internal_err!( + null_stats.len(), + set_stats.len(), + "Null and set stats length mismatch for pruning" + ); + } + Ok(PruningIntermediate::IntermediateStats(ColumnStats { range_stats, null_stats, - set_stats: None, + set_stats, })) } } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index e6a6db75bebd7..182fda5474036 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -48,6 +48,7 @@ use datafusion_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, Volatility, expr_vec_fmt, }; +use datafusion_physical_expr_common::physical_expr::SetStats; /// Physical expression of a scalar function pub struct ScalarFunctionExpr { @@ -336,6 +337,13 @@ impl PhysicalExpr for ScalarFunctionExpr { self.fun.propagate_constraints(interval, children) } + fn propagate_set_stats( + &self, + child_set_stats: &[SetStats], + ) -> Result> { + self.fun.propagate_set_stats(child_set_stats) + } + fn get_properties(&self, children: &[ExprProperties]) -> Result { let sort_properties = self.fun.output_ordering(children)?; let preserves_lex_ordering = self.fun.preserves_lex_ordering(children)?; diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs index 5a68fae9dd8d7..416b5f7d83884 100644 --- a/datafusion/physical-expr/tests/pruning.rs +++ b/datafusion/physical-expr/tests/pruning.rs @@ -24,9 +24,12 @@ mod test { use arrow::array::{ArrayRef, BooleanArray, Int32Array, Int64Array, UInt64Array}; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use datafusion_common::ScalarValue; + use datafusion_common::config::ConfigOptions; use datafusion_common::pruning::PruningStatistics; use datafusion_expr::Operator; + use datafusion_functions::string; use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::expressions::{ BinaryExpr, Column, in_list, is_null, lit, }; @@ -451,4 +454,78 @@ mod test { other => panic!("unexpected result: {other:?}"), } } + + #[test] + fn in_list_prunes_upper_with_set_stats() { + let mins = ScalarValue::iter_to_array(vec![ + ScalarValue::Utf8(Some("electronic".to_string())), + ScalarValue::Utf8(Some("chair".to_string())), + ScalarValue::Utf8(Some("book".to_string())), + ]) + .unwrap(); + let maxs = ScalarValue::iter_to_array(vec![ + ScalarValue::Utf8(Some("electronic".to_string())), + ScalarValue::Utf8(Some("chair".to_string())), + ScalarValue::Utf8(Some("pencil".to_string())), + ]) + .unwrap(); + let null_counts = ScalarValue::iter_to_array(vec![ + ScalarValue::UInt64(Some(0)), + ScalarValue::UInt64(Some(0)), + ScalarValue::UInt64(Some(0)), + ]) + .unwrap(); + + let mut set_true = HashSet::new(); + set_true.insert(ScalarValue::Utf8(Some("electronic".to_string()))); + + let mut set_false = HashSet::new(); + set_false.insert(ScalarValue::Utf8(Some("chair".to_string()))); + + let mut set_unknown = HashSet::new(); + set_unknown.insert(ScalarValue::Utf8(Some("book".to_string()))); + set_unknown.insert(ScalarValue::Utf8(Some("pencil".to_string()))); + + let stats = Arc::new(MockPruningStatistics::new_with_sets( + "c", + mins, + maxs, + null_counts, + None, + Some(vec![Some(set_true), Some(set_false), Some(set_unknown)]), + )); + + let ctx = Arc::new(PruningContext::new(stats)); + let schema = Schema::new(vec![Field::new("c", DataType::Utf8, true)]); + let upper_udf = string::upper(); + let upper_expr = ScalarFunctionExpr::try_new( + upper_udf, + vec![Arc::new(Column::new("c", 0)) as Arc], + &schema, + Arc::new(ConfigOptions::new()), + ) + .unwrap(); + + let expr = in_list( + Arc::new(upper_expr), + vec![lit("ELECTRONIC"), lit("BOOK")], + &false, + &schema, + ) + .unwrap(); + + match expr.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!( + results, + vec![ + PruningResult::AlwaysTrue, + PruningResult::AlwaysFalse, + PruningResult::Unknown + ] + ); + } + other => panic!("unexpected result: {other:?}"), + } + } } From 62aabcce3bcaebc75476ae4eb91bfdaa33f7944c Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Thu, 25 Dec 2025 11:32:28 +0800 Subject: [PATCH 10/11] move pruning to a separate file --- datafusion/functions/src/string/upper.rs | 2 +- .../physical-expr-common/src/physical_expr.rs | 221 +---------------- .../src/physical_expr/pruning.rs | 224 ++++++++++++++++++ .../physical-expr/src/expressions/binary.rs | 72 +++++- .../physical-expr/src/expressions/in_list.rs | 8 +- .../physical-expr/src/expressions/is_null.rs | 4 +- datafusion/physical-expr/tests/pruning.rs | 176 +++++++++++--- .../physical-expr/tests/pruning_utils.rs | 101 ++++++-- 8 files changed, 544 insertions(+), 264 deletions(-) create mode 100644 datafusion/physical-expr-common/src/physical_expr/pruning.rs diff --git a/datafusion/functions/src/string/upper.rs b/datafusion/functions/src/string/upper.rs index 819bfaf400510..20220038b083a 100644 --- a/datafusion/functions/src/string/upper.rs +++ b/datafusion/functions/src/string/upper.rs @@ -90,7 +90,7 @@ impl ScalarUDFImpl for UpperFunc { } /// For input stat, capitalize the set statistics. - /// + /// /// Example: /// Input expression `c` has set stat `{'foo', 'bar'}` /// This function `upper(c)` will propagate the set stat to `{'FOO', 'BAR'}` diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 42c3cf8f2b63d..fe7498ce2b87b 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -16,7 +16,6 @@ // under the License. use std::any::Any; -use std::collections::HashSet; use std::fmt; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; @@ -28,7 +27,6 @@ use arrow::array::{ArrayRef, BooleanArray, new_empty_array}; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::pruning::PruningStatistics; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; @@ -42,6 +40,13 @@ use datafusion_expr_common::statistics::Distribution; use itertools::izip; +mod pruning; + +pub use pruning::{ + ColumnStats, NullStats, PruningContext, PruningIntermediate, PruningResult, + RangeStats, SetStats, +}; + /// Shared [`PhysicalExpr`]. pub type PhysicalExprRef = Arc; @@ -529,8 +534,8 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { if let Some((first, rest)) = child_null_stats.split_first() { for stats in rest { assert_eq_or_internal_err!( - first.length, - stats.length, + first.len(), + stats.len(), "Null stats length mismatch between pruning children" ); } @@ -560,7 +565,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { { assert_eq_or_internal_err!( range_stats.len(), - null_stats.length, + null_stats.len(), "Range and null stats length mismatch for pruning" ); } @@ -579,7 +584,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { (null_stats.as_ref(), set_stats.as_ref()) { assert_eq_or_internal_err!( - null_stats.length, + null_stats.len(), set_stats.len(), "Null and set stats length mismatch for pruning" ); @@ -593,207 +598,6 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { } } -// Pruner Common -/// e.g. for x > 5 -/// bucket 1 has stat [10,15] -> AlwaysTrue -/// bucket 2 has stat [0,5] -> AlwaysFalse -/// bucket 3 has stat [0,10] -> Unknown -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum PruningResult { - AlwaysTrue, - AlwaysFalse, - Unknown, -} - -#[derive(Debug, Clone)] -pub enum RangeStats { - Values { - mins: Option, - maxs: Option, - length: usize, - }, - /// Represents a uniform literal value across all containers. - /// This variant make it easy to compare between literals and normal ranges representing - /// each containers' value range. - /// - /// TODO: remove length -- seems redundant - Scalar { value: ScalarValue, length: usize }, -} - -#[derive(Debug, Clone)] -pub struct NullStats { - null_counts: Option, - row_counts: Option, - length: usize, -} - -#[derive(Debug, Clone)] -pub struct SetStats { - sets: Vec>>, -} - -#[derive(Debug, Clone)] -pub struct ColumnStats { - pub range_stats: Option, - pub null_stats: Option, - pub set_stats: Option, -} - -impl RangeStats { - pub fn new( - mins: Option, - maxs: Option, - length: usize, - ) -> Result { - if let Some(ref mins) = mins { - assert_eq_or_internal_err!( - mins.len(), - length, - "Range mins length mismatch for pruning statistics" - ); - } - if let Some(ref maxs) = maxs { - assert_eq_or_internal_err!( - maxs.len(), - length, - "Range maxs length mismatch for pruning statistics" - ); - } - Ok(Self::Values { mins, maxs, length }) - } - - /// Create range stats for a constant literal across all containers. - /// - pub fn new_scalar(value: ScalarValue, length: usize) -> Result { - Ok(Self::Scalar { value, length }) - } - - pub fn len(&self) -> usize { - match self { - RangeStats::Values { length, .. } | RangeStats::Scalar { length, .. } => { - *length - } - } - } -} - -pub struct PruningContext { - stats: Arc, -} - -impl PruningContext { - pub fn new(stats: Arc) -> Self { - Self { stats } - } - - pub fn statistics(&self) -> &Arc { - &self.stats - } -} - -impl NullStats { - pub fn new( - null_counts: Option, - row_counts: Option, - length: usize, - ) -> Result { - if let Some(ref null_counts) = null_counts { - assert_eq_or_internal_err!( - null_counts.len(), - length, - "Null counts length mismatch for pruning statistics" - ); - } - if let Some(ref row_counts) = row_counts { - assert_eq_or_internal_err!( - row_counts.len(), - length, - "Row counts length mismatch for pruning statistics" - ); - } - Ok(Self { - null_counts, - row_counts, - length, - }) - } - - pub fn len(&self) -> usize { - self.length - } - - pub fn null_counts(&self) -> Option<&ArrayRef> { - self.null_counts.as_ref() - } - - pub fn row_counts(&self) -> Option<&ArrayRef> { - self.row_counts.as_ref() - } -} - -impl SetStats { - pub fn new(sets: Vec>>, length: usize) -> Result { - assert_eq_or_internal_err!( - sets.len(), - length, - "Set stats length mismatch for pruning statistics" - ); - Ok(Self { sets }) - } - - pub fn len(&self) -> usize { - self.sets.len() - } - - pub fn value_sets(&self) -> &[Option>] { - &self.sets - } -} - -impl ColumnStats { - pub fn new(range_stats: Option, null_stats: Option) -> Self { - Self::new_with_set_stats(range_stats, null_stats, None) - } - - pub fn new_with_set_stats( - range_stats: Option, - null_stats: Option, - set_stats: Option, - ) -> Self { - Self { - range_stats, - null_stats, - set_stats, - } - } - - pub fn range_stats(&self) -> Option<&RangeStats> { - self.range_stats.as_ref() - } - - pub fn null_stats(&self) -> Option<&NullStats> { - self.null_stats.as_ref() - } - - pub fn set_stats(&self) -> Option<&SetStats> { - self.set_stats.as_ref() - } -} - -// TODO: should include length (container count) -#[derive(Debug, Clone)] -pub enum PruningIntermediate { - IntermediateStats(ColumnStats), - IntermediateResult(Vec), -} - -impl PruningIntermediate { - /// Create an `IntermediateStats` variant with no range or null statistics. - pub fn empty_stats() -> Self { - Self::IntermediateStats(ColumnStats::new(None, None)) - } -} - #[deprecated( since = "50.0.0", note = "Use `datafusion_expr_common::dyn_eq` instead" @@ -1024,10 +828,9 @@ pub fn is_volatile(expr: &Arc) -> bool { #[cfg(test)] mod test { - use crate::physical_expr::{PhysicalExpr, RangeStats}; + use crate::physical_expr::PhysicalExpr; use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch}; use arrow::datatypes::{DataType, Schema}; - use datafusion_common::ScalarValue; use datafusion_expr_common::columnar_value::ColumnarValue; use std::fmt::{Display, Formatter}; use std::sync::Arc; diff --git a/datafusion/physical-expr-common/src/physical_expr/pruning.rs b/datafusion/physical-expr-common/src/physical_expr/pruning.rs new file mode 100644 index 0000000000000..83a81d35a3907 --- /dev/null +++ b/datafusion/physical-expr-common/src/physical_expr/pruning.rs @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use arrow::array::ArrayRef; +use datafusion_common::pruning::PruningStatistics; +use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err}; + +// Pruner Common +/// e.g. for x > 5 +/// bucket 1 has stat [10,15] -> KeepAll +/// bucket 2 has stat [0,5] -> SkipAll (prune it) +/// bucket 3 has stat [0,10] -> Unknown +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PruningResult { + KeepAll, + SkipAll, + Unknown, +} + +#[derive(Debug, Clone)] +pub enum RangeStats { + Values { + mins: Option, + maxs: Option, + length: usize, + }, + /// Represents a uniform literal value across all containers. + /// This variant make it easy to compare between literals and normal ranges representing + /// each containers' value range. + /// + /// TODO: remove length -- seems redundant + Scalar { value: ScalarValue, length: usize }, +} + +#[derive(Debug, Clone)] +pub struct NullStats { + null_counts: Option, + row_counts: Option, + length: usize, +} + +#[derive(Debug, Clone)] +pub struct SetStats { + sets: Vec>>, +} + +#[derive(Debug, Clone)] +pub struct ColumnStats { + pub range_stats: Option, + pub null_stats: Option, + pub set_stats: Option, +} + +impl RangeStats { + pub fn new( + mins: Option, + maxs: Option, + length: usize, + ) -> Result { + if let Some(ref mins) = mins { + assert_eq_or_internal_err!( + mins.len(), + length, + "Range mins length mismatch for pruning statistics" + ); + } + if let Some(ref maxs) = maxs { + assert_eq_or_internal_err!( + maxs.len(), + length, + "Range maxs length mismatch for pruning statistics" + ); + } + Ok(Self::Values { mins, maxs, length }) + } + + /// Create range stats for a constant literal across all containers. + /// + pub fn new_scalar(value: ScalarValue, length: usize) -> Result { + Ok(Self::Scalar { value, length }) + } + + pub fn len(&self) -> usize { + match self { + RangeStats::Values { length, .. } | RangeStats::Scalar { length, .. } => { + *length + } + } + } +} + +pub struct PruningContext { + stats: Arc, +} + +impl PruningContext { + pub fn new(stats: Arc) -> Self { + Self { stats } + } + + pub fn statistics(&self) -> &Arc { + &self.stats + } +} + +impl NullStats { + pub fn new( + null_counts: Option, + row_counts: Option, + length: usize, + ) -> Result { + if let Some(ref null_counts) = null_counts { + assert_eq_or_internal_err!( + null_counts.len(), + length, + "Null counts length mismatch for pruning statistics" + ); + } + if let Some(ref row_counts) = row_counts { + assert_eq_or_internal_err!( + row_counts.len(), + length, + "Row counts length mismatch for pruning statistics" + ); + } + Ok(Self { + null_counts, + row_counts, + length, + }) + } + + pub fn len(&self) -> usize { + self.length + } + + pub fn null_counts(&self) -> Option<&ArrayRef> { + self.null_counts.as_ref() + } + + pub fn row_counts(&self) -> Option<&ArrayRef> { + self.row_counts.as_ref() + } +} + +impl SetStats { + pub fn new(sets: Vec>>, length: usize) -> Result { + assert_eq_or_internal_err!( + sets.len(), + length, + "Set stats length mismatch for pruning statistics" + ); + Ok(Self { sets }) + } + + pub fn len(&self) -> usize { + self.sets.len() + } + + pub fn value_sets(&self) -> &[Option>] { + &self.sets + } +} + +impl ColumnStats { + pub fn new(range_stats: Option, null_stats: Option) -> Self { + Self::new_with_set_stats(range_stats, null_stats, None) + } + + pub fn new_with_set_stats( + range_stats: Option, + null_stats: Option, + set_stats: Option, + ) -> Self { + Self { + range_stats, + null_stats, + set_stats, + } + } + + pub fn range_stats(&self) -> Option<&RangeStats> { + self.range_stats.as_ref() + } + + pub fn null_stats(&self) -> Option<&NullStats> { + self.null_stats.as_ref() + } + + pub fn set_stats(&self) -> Option<&SetStats> { + self.set_stats.as_ref() + } +} + +// TODO: should include length (container count) +#[derive(Debug, Clone)] +pub enum PruningIntermediate { + IntermediateStats(ColumnStats), + IntermediateResult(Vec), +} + +impl PruningIntermediate { + /// Create an `IntermediateStats` variant with no range or null statistics. + pub fn empty_stats() -> Self { + Self::IntermediateStats(ColumnStats::new(None, None)) + } +} diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index a8e4204a9336c..353f83ea74da7 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -305,6 +305,11 @@ impl PhysicalExpr for BinaryExpr { fn evaluate_pruning(&self, ctx: Arc) -> Result { use Operator::*; + + if matches!(self.op, And | Or) { + return self.evaluate_logical_pruning(ctx); + } + let is_cmp = matches!(self.op, Eq | NotEq | Lt | LtEq | Gt | GtEq); if !is_cmp { return self.default_pruning(ctx); @@ -350,9 +355,9 @@ impl PhysicalExpr for BinaryExpr { ) .unwrap_or(false); let pruning = if res { - PruningResult::AlwaysTrue + PruningResult::KeepAll } else { - PruningResult::AlwaysFalse + PruningResult::SkipAll }; return Ok(PruningIntermediate::IntermediateResult(vec![ pruning, @@ -375,8 +380,8 @@ impl PhysicalExpr for BinaryExpr { let mut results = Vec::with_capacity(l_range.len()); for (l, r) in l_range.into_iter().zip(r_range.into_iter()) { let res = match compare_ranges(&self.op, &l, &r) { - Some(true) => PruningResult::AlwaysTrue, - Some(false) => PruningResult::AlwaysFalse, + Some(true) => PruningResult::KeepAll, + Some(false) => PruningResult::SkipAll, None => PruningResult::Unknown, }; results.push(res); @@ -1035,7 +1040,66 @@ fn compare_ranges( } } +fn stats_len(stats: &ColumnStats) -> Option { + stats + .range_stats() + .map(|s| s.len()) + .or_else(|| stats.null_stats().map(|s| s.len())) + .or_else(|| stats.set_stats().map(|s| s.len())) +} + +fn pruning_to_results(intermediate: &PruningIntermediate) -> Option> { + match intermediate { + PruningIntermediate::IntermediateResult(results) => Some(results.clone()), + PruningIntermediate::IntermediateStats(stats) => { + stats_len(stats).map(|len| vec![PruningResult::Unknown; len]) + } + } +} + impl BinaryExpr { + fn evaluate_logical_pruning( + &self, + ctx: Arc, + ) -> Result { + use PruningResult::*; + + let left = self.left.evaluate_pruning(Arc::clone(&ctx))?; + let right = self.right.evaluate_pruning(ctx)?; + + let left_results = pruning_to_results(&left); + let right_results = pruning_to_results(&right); + + match (left_results, right_results) { + (Some(mut l), Some(r)) => { + assert_eq_or_internal_err!( + l.len(), + r.len(), + "Logical pruning inputs have mismatched lengths" + ); + + for (lres, rres) in l.iter_mut().zip(r.into_iter()) { + *lres = match self.op { + Operator::And => match (*lres, rres) { + (SkipAll, _) | (_, SkipAll) => SkipAll, + (KeepAll, KeepAll) => KeepAll, + _ => Unknown, + }, + Operator::Or => match (*lres, rres) { + (KeepAll, _) | (_, KeepAll) => KeepAll, + (SkipAll, SkipAll) => SkipAll, + _ => Unknown, + }, + _ => unreachable!("Logical pruning only handles AND/OR"), + }; + } + + Ok(PruningIntermediate::IntermediateResult(l)) + } + _ => Ok(PruningIntermediate::IntermediateResult(vec![Unknown])), + } + } + #[allow(dead_code)] fn default_pruning(&self, ctx: Arc) -> Result { let children = self.children(); diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 33ea1c7fbead6..e946940d41688 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -895,10 +895,10 @@ impl PhysicalExpr for InListExpr { let is_subset = values.iter().all(|v| list_values.contains(v)); let is_disjoint = values.iter().all(|v| !list_values.contains(v)); match (is_subset, is_disjoint, self.negated) { - (true, _, false) => PruningResult::AlwaysTrue, - (true, _, true) => PruningResult::AlwaysFalse, - (_, true, false) => PruningResult::AlwaysFalse, - (_, true, true) => PruningResult::AlwaysTrue, + (true, _, false) => PruningResult::KeepAll, + (true, _, true) => PruningResult::SkipAll, + (_, true, false) => PruningResult::SkipAll, + (_, true, true) => PruningResult::KeepAll, _ => PruningResult::Unknown, } } diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index a66740c116095..1fe5960502fe9 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -122,9 +122,9 @@ impl PhysicalExpr for IsNullExpr { let n = null_counts.value(i); let r = row_counts.value(i); if n == 0 { - AlwaysFalse + SkipAll } else if n == r { - AlwaysTrue + KeepAll } else { Unknown } diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs index 416b5f7d83884..791631b4d6e09 100644 --- a/datafusion/physical-expr/tests/pruning.rs +++ b/datafusion/physical-expr/tests/pruning.rs @@ -37,7 +37,14 @@ mod test { ColumnStats, PruningContext, PruningIntermediate, PruningResult, RangeStats, }; - use crate::pruning_utils::{DummyStats, MockPruningStatistics}; + use crate::pruning_utils::{ + DummyStats, MockPruningStatistics, MultiColumnPruningStatistics, + }; + + // #[test] + // fn column_ref_stat() { + // let source_pruning_stat = MockPruningStatistics::new("a", Arc::new(Int32Array::from(vec![1,2,3])), Arc::new(Int32Array::from(vec![Some(10), None, Some(30)])), null_counts, row_counts) + // } #[test] fn column_pruning_uses_parquet_stats() { @@ -120,9 +127,9 @@ mod test { let pruning_stats = Arc::new(MockPruningStatistics::new_with_sets( "c", - mins, - maxs, - null_counts, + Some(mins), + Some(maxs), + Some(null_counts), Some(row_counts), Some(vec![Some(set.clone())]), )); @@ -249,7 +256,7 @@ mod test { let res = expr.evaluate_pruning(ctx).unwrap(); match res { PruningIntermediate::IntermediateResult(results) => { - assert_eq!(results, vec![PruningResult::AlwaysTrue]) + assert_eq!(results, vec![PruningResult::KeepAll]) } other => panic!("unexpected result: {other:?}"), } @@ -268,7 +275,7 @@ mod test { let res = expr.evaluate_pruning(ctx).unwrap(); match res { PruningIntermediate::IntermediateResult(results) => { - assert_eq!(results, vec![PruningResult::AlwaysFalse; 2]) + assert_eq!(results, vec![PruningResult::SkipAll; 2]) } other => panic!("unexpected result: {other:?}"), } @@ -283,9 +290,9 @@ mod test { let rows = Arc::new(UInt64Array::from(vec![Some(3), Some(3)])); let stats = Arc::new(MockPruningStatistics::new( "a", - mins.clone(), - maxs.clone(), - zeros.clone(), + Some(mins.clone()), + Some(maxs.clone()), + Some(zeros.clone()), Some(rows.clone()), )); let ctx = Arc::new(PruningContext::new(stats)); @@ -293,7 +300,7 @@ mod test { let res = expr.evaluate_pruning(ctx).unwrap(); match res { PruningIntermediate::IntermediateResult(results) => { - assert_eq!(results, vec![PruningResult::AlwaysTrue; 2]) + assert_eq!(results, vec![PruningResult::KeepAll; 2]) } other => panic!("unexpected result: {other:?}"), } @@ -303,9 +310,9 @@ mod test { let maxs = Arc::new(Int32Array::from(vec![Some(9), Some(4)])); let stats = Arc::new(MockPruningStatistics::new( "a", - mins, - maxs, - zeros, + Some(mins), + Some(maxs), + Some(zeros), Some(rows), )); let ctx = Arc::new(PruningContext::new(stats)); @@ -314,7 +321,7 @@ mod test { PruningIntermediate::IntermediateResult(results) => { assert_eq!( results, - vec![PruningResult::Unknown, PruningResult::AlwaysFalse] + vec![PruningResult::Unknown, PruningResult::SkipAll] ) } other => panic!("unexpected result: {other:?}"), @@ -329,9 +336,9 @@ mod test { let rows = Arc::new(UInt64Array::from(vec![Some(1)])); let stats = Arc::new(MockPruningStatistics::new( "a", - mins, - maxs, - zeros, + Some(mins), + Some(maxs), + Some(zeros), Some(rows), )); let ctx = Arc::new(PruningContext::new(stats)); @@ -373,7 +380,7 @@ mod test { let res = expr.evaluate_pruning(ctx).unwrap(); match res { PruningIntermediate::IntermediateResult(results) => { - assert_eq!(results, vec![PruningResult::AlwaysFalse]) + assert_eq!(results, vec![PruningResult::SkipAll]) } other => panic!("unexpected result: {other:?}"), } @@ -392,7 +399,7 @@ mod test { match expr.evaluate_pruning(ctx).unwrap() { PruningIntermediate::IntermediateResult(results) => { - assert_eq!(results, vec![PruningResult::AlwaysFalse]) + assert_eq!(results, vec![PruningResult::SkipAll]) } other => panic!("unexpected result: {other:?}"), } @@ -407,7 +414,7 @@ mod test { match expr.evaluate_pruning(ctx).unwrap() { PruningIntermediate::IntermediateResult(results) => { - assert_eq!(results, vec![PruningResult::AlwaysTrue]) + assert_eq!(results, vec![PruningResult::KeepAll]) } other => panic!("unexpected result: {other:?}"), } @@ -430,9 +437,9 @@ mod test { let stats = Arc::new(MockPruningStatistics::new_with_sets( "c", - mins, - maxs, - null_counts, + Some(mins), + Some(maxs), + Some(null_counts), None, Some(vec![Some(value_set)]), )); @@ -449,7 +456,7 @@ mod test { match expr.evaluate_pruning(ctx).unwrap() { PruningIntermediate::IntermediateResult(results) => { - assert_eq!(results, vec![PruningResult::AlwaysFalse]) + assert_eq!(results, vec![PruningResult::SkipAll]) } other => panic!("unexpected result: {other:?}"), } @@ -488,9 +495,9 @@ mod test { let stats = Arc::new(MockPruningStatistics::new_with_sets( "c", - mins, - maxs, - null_counts, + Some(mins), + Some(maxs), + Some(null_counts), None, Some(vec![Some(set_true), Some(set_false), Some(set_unknown)]), )); @@ -519,8 +526,8 @@ mod test { assert_eq!( results, vec![ - PruningResult::AlwaysTrue, - PruningResult::AlwaysFalse, + PruningResult::KeepAll, + PruningResult::SkipAll, PruningResult::Unknown ] ); @@ -528,4 +535,115 @@ mod test { other => panic!("unexpected result: {other:?}"), } } + + #[test] + fn logical_or_prunes_combined_predicate() { + let num_containers = 3; + let mut stats = MultiColumnPruningStatistics::new(num_containers); + + // a ranges + stats.mins.insert( + "a".to_string(), + ScalarValue::iter_to_array(vec![ + ScalarValue::Int32(Some(2)), + ScalarValue::Int32(Some(0)), + ScalarValue::Int32(Some(0)), + ]) + .unwrap(), + ); + stats.maxs.insert( + "a".to_string(), + ScalarValue::iter_to_array(vec![ + ScalarValue::Int32(Some(3)), + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(3)), + ]) + .unwrap(), + ); + + // b null counts + stats.null_counts.insert( + "b".to_string(), + ScalarValue::iter_to_array(vec![ + ScalarValue::UInt64(Some(0)), + ScalarValue::UInt64(Some(5)), + ScalarValue::UInt64(Some(1)), + ]) + .unwrap(), + ); + stats.row_counts.insert( + "b".to_string(), + ScalarValue::iter_to_array(vec![ + ScalarValue::UInt64(Some(5)), + ScalarValue::UInt64(Some(5)), + ScalarValue::UInt64(Some(5)), + ]) + .unwrap(), + ); + + // c value sets + let mut set_true = HashSet::new(); + set_true.insert(ScalarValue::Utf8(Some("electronic".to_string()))); + + let mut set_false = HashSet::new(); + set_false.insert(ScalarValue::Utf8(Some("chair".to_string()))); + + let mut set_unknown = HashSet::new(); + set_unknown.insert(ScalarValue::Utf8(Some("book".to_string()))); + set_unknown.insert(ScalarValue::Utf8(Some("pencil".to_string()))); + + stats.value_sets.insert( + "c".to_string(), + vec![Some(set_true), Some(set_false), Some(set_unknown)], + ); + + let ctx = Arc::new(PruningContext::new(Arc::new(stats))); + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Utf8, true), + ]); + + let mult_expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Multiply, + lit(7), + )); + let a_gt = Arc::new(BinaryExpr::new(mult_expr, Operator::Gt, lit(10))); + let b_is_null = is_null(Arc::new(Column::new("b", 1))).unwrap(); + + let upper_udf = string::upper(); + let upper_expr = ScalarFunctionExpr::try_new( + upper_udf, + vec![Arc::new(Column::new("c", 2)) as Arc], + &schema, + Arc::new(ConfigOptions::new()), + ) + .unwrap(); + let c_in_list = in_list( + Arc::new(upper_expr), + vec![lit("ELECTRONIC"), lit("BOOK")], + &false, + &schema, + ) + .unwrap(); + + let left_or = + Arc::new(BinaryExpr::new(a_gt, Operator::Or, Arc::clone(&b_is_null))); + let predicate = BinaryExpr::new(left_or, Operator::Or, c_in_list); + + match predicate.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!( + results, + vec![ + PruningResult::KeepAll, + PruningResult::KeepAll, + PruningResult::Unknown + ] + ); + } + other => panic!("unexpected pruning result: {other:?}"), + } + } } diff --git a/datafusion/physical-expr/tests/pruning_utils.rs b/datafusion/physical-expr/tests/pruning_utils.rs index e1ff7d16b9392..bf61daceb97c7 100644 --- a/datafusion/physical-expr/tests/pruning_utils.rs +++ b/datafusion/physical-expr/tests/pruning_utils.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray}; @@ -50,9 +50,9 @@ impl PruningStatistics for DummyStats { /// actual file/row group metadata. pub struct MockPruningStatistics { column: String, - min_values: ArrayRef, - max_values: ArrayRef, - null_counts: ArrayRef, + min_values: Option, + max_values: Option, + null_counts: Option, row_counts: Option, num_containers: usize, value_sets: Option>>>, @@ -61,9 +61,9 @@ pub struct MockPruningStatistics { impl MockPruningStatistics { pub fn new( column: impl Into, - min_values: ArrayRef, - max_values: ArrayRef, - null_counts: ArrayRef, + min_values: Option, + max_values: Option, + null_counts: Option, row_counts: Option, ) -> Self { Self::new_with_sets( @@ -78,13 +78,19 @@ impl MockPruningStatistics { pub fn new_with_sets( column: impl Into, - min_values: ArrayRef, - max_values: ArrayRef, - null_counts: ArrayRef, + min_values: Option, + max_values: Option, + null_counts: Option, row_counts: Option, value_sets: Option>>>, ) -> Self { - let num_containers = min_values.len(); + let num_containers = min_values + .as_ref() + .or(max_values.as_ref()) + .or(null_counts.as_ref()) + .or(row_counts.as_ref()) + .map(|a| a.len()) + .unwrap_or(0); if let Some(value_sets) = value_sets.as_ref() { assert_eq!( value_sets.len(), @@ -128,17 +134,27 @@ impl MockPruningStatistics { ) .expect("rows"); - Self::new(column, mins, maxs, null_counts, Some(row_counts)) + Self::new( + column, + Some(mins), + Some(maxs), + Some(null_counts), + Some(row_counts), + ) } } impl PruningStatistics for MockPruningStatistics { fn min_values(&self, column: &Column) -> Option { - (column.name == self.column).then(|| Arc::clone(&self.min_values)) + (column.name == self.column) + .then(|| self.min_values.as_ref().map(Arc::clone)) + .flatten() } fn max_values(&self, column: &Column) -> Option { - (column.name == self.column).then(|| Arc::clone(&self.max_values)) + (column.name == self.column) + .then(|| self.max_values.as_ref().map(Arc::clone)) + .flatten() } fn num_containers(&self) -> usize { @@ -146,7 +162,9 @@ impl PruningStatistics for MockPruningStatistics { } fn null_counts(&self, column: &Column) -> Option { - (column.name == self.column).then(|| Arc::clone(&self.null_counts)) + (column.name == self.column) + .then(|| self.null_counts.as_ref().map(Arc::clone)) + .flatten() } fn row_counts(&self, column: &Column) -> Option { @@ -167,3 +185,56 @@ impl PruningStatistics for MockPruningStatistics { None } } + +#[derive(Clone)] +pub struct MultiColumnPruningStatistics { + pub mins: HashMap, + pub maxs: HashMap, + pub null_counts: HashMap, + pub row_counts: HashMap, + pub value_sets: HashMap>>>, + pub num_containers: usize, +} + +impl MultiColumnPruningStatistics { + pub fn new(num_containers: usize) -> Self { + Self { + mins: HashMap::new(), + maxs: HashMap::new(), + null_counts: HashMap::new(), + row_counts: HashMap::new(), + value_sets: HashMap::new(), + num_containers, + } + } +} + +impl PruningStatistics for MultiColumnPruningStatistics { + fn min_values(&self, column: &Column) -> Option { + self.mins.get(&column.name).cloned() + } + + fn max_values(&self, column: &Column) -> Option { + self.maxs.get(&column.name).cloned() + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, column: &Column) -> Option { + self.null_counts.get(&column.name).cloned() + } + + fn row_counts(&self, column: &Column) -> Option { + self.row_counts.get(&column.name).cloned() + } + + fn value_sets(&self, column: &Column) -> Option>>> { + self.value_sets.get(&column.name).cloned() + } + + fn contained(&self, _: &Column, _: &HashSet) -> Option { + None + } +} From 5a54851ab716009f0c922d834713e183bc0d7ea8 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Thu, 25 Dec 2025 13:37:19 +0800 Subject: [PATCH 11/11] refactor NullStat representations --- .../physical-expr-common/src/physical_expr.rs | 4 +- .../src/physical_expr/pruning.rs | 59 ++++++++++++++----- .../physical-expr/src/expressions/is_null.rs | 49 ++++----------- datafusion/physical-expr/tests/pruning.rs | 27 ++------- 4 files changed, 63 insertions(+), 76 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index fe7498ce2b87b..a25392f330ea8 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -43,8 +43,8 @@ use itertools::izip; mod pruning; pub use pruning::{ - ColumnStats, NullStats, PruningContext, PruningIntermediate, PruningResult, - RangeStats, SetStats, + ColumnStats, NullPresence, NullStats, PruningContext, PruningIntermediate, + PruningResult, RangeStats, SetStats, }; /// Shared [`PhysicalExpr`]. diff --git a/datafusion/physical-expr-common/src/physical_expr/pruning.rs b/datafusion/physical-expr-common/src/physical_expr/pruning.rs index 83a81d35a3907..442864389ce6f 100644 --- a/datafusion/physical-expr-common/src/physical_expr/pruning.rs +++ b/datafusion/physical-expr-common/src/physical_expr/pruning.rs @@ -18,7 +18,7 @@ use std::collections::HashSet; use std::sync::Arc; -use arrow::array::ArrayRef; +use arrow::array::{Array, ArrayRef}; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err}; @@ -49,11 +49,21 @@ pub enum RangeStats { Scalar { value: ScalarValue, length: usize }, } +/// Summaries about whether a container has nulls. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NullPresence { + /// Every row in the container is null. + AllNull, + /// No rows in the container are null. + NoNull, + /// Mixed values or insufficient information to decide. + Unknown, +} + +/// Null-related statistics for each container. #[derive(Debug, Clone)] pub struct NullStats { - null_counts: Option, - row_counts: Option, - length: usize, + presence: Vec, } #[derive(Debug, Clone)] @@ -140,23 +150,40 @@ impl NullStats { "Row counts length mismatch for pruning statistics" ); } - Ok(Self { - null_counts, - row_counts, - length, - }) - } - pub fn len(&self) -> usize { - self.length + let null_counts = null_counts + .as_ref() + .and_then(|counts| counts.as_any().downcast_ref::()); + let row_counts = row_counts + .as_ref() + .and_then(|counts| counts.as_any().downcast_ref::()); + + let mut presence = Vec::with_capacity(length); + for idx in 0..length { + let nulls = null_counts + .and_then(|counts| (!counts.is_null(idx)).then(|| counts.value(idx))); + let rows = row_counts + .and_then(|counts| (!counts.is_null(idx)).then(|| counts.value(idx))); + + let state = match (nulls, rows) { + (Some(0), Some(_)) => NullPresence::NoNull, + (Some(n), Some(r)) if n == r => NullPresence::AllNull, + (Some(0), None) => NullPresence::NoNull, + _ => NullPresence::Unknown, + }; + + presence.push(state); + } + + Ok(Self { presence }) } - pub fn null_counts(&self) -> Option<&ArrayRef> { - self.null_counts.as_ref() + pub fn len(&self) -> usize { + self.presence.len() } - pub fn row_counts(&self) -> Option<&ArrayRef> { - self.row_counts.as_ref() + pub fn presence(&self) -> &[NullPresence] { + &self.presence } } diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 1fe5960502fe9..6bf7b985d12c4 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -18,7 +18,6 @@ //! IS NULL expression use crate::PhysicalExpr; -use arrow::array::{Array, UInt64Array}; use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, @@ -27,7 +26,7 @@ use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{ - PruningContext, PruningIntermediate, + NullPresence, PruningContext, PruningIntermediate, }; use std::hash::Hash; use std::{any::Any, sync::Arc}; @@ -103,41 +102,17 @@ impl PhysicalExpr for IsNullExpr { match child { PruningIntermediate::IntermediateStats(stats) => { if let Some(null_stats) = stats.null_stats() { - if let (Some(null_counts), Some(row_counts)) = - (null_stats.null_counts(), null_stats.row_counts()) - { - if let (Some(null_counts), Some(row_counts)) = ( - null_counts.as_any().downcast_ref::(), - row_counts.as_any().downcast_ref::(), - ) { - let len = null_counts.len(); - if len == row_counts.len() { - let mut results = Vec::with_capacity(len); - for i in 0..len { - let res = if null_counts.is_null(i) - || row_counts.is_null(i) - { - Unknown - } else { - let n = null_counts.value(i); - let r = row_counts.value(i); - if n == 0 { - SkipAll - } else if n == r { - KeepAll - } else { - Unknown - } - }; - results.push(res); - } - - return Ok(PruningIntermediate::IntermediateResult( - results, - )); - } - } - } + let results = null_stats + .presence() + .iter() + .map(|presence| match presence { + NullPresence::AllNull => KeepAll, + NullPresence::NoNull => SkipAll, + NullPresence::Unknown => Unknown, + }) + .collect(); + + return Ok(PruningIntermediate::IntermediateResult(results)); } Ok(PruningIntermediate::IntermediateResult(vec![Unknown])) } diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs index 791631b4d6e09..4b44fe21d12cc 100644 --- a/datafusion/physical-expr/tests/pruning.rs +++ b/datafusion/physical-expr/tests/pruning.rs @@ -34,18 +34,14 @@ mod test { BinaryExpr, Column, in_list, is_null, lit, }; use datafusion_physical_expr_common::physical_expr::{ - ColumnStats, PruningContext, PruningIntermediate, PruningResult, RangeStats, + ColumnStats, NullPresence, PruningContext, PruningIntermediate, PruningResult, + RangeStats, }; use crate::pruning_utils::{ DummyStats, MockPruningStatistics, MultiColumnPruningStatistics, }; - // #[test] - // fn column_ref_stat() { - // let source_pruning_stat = MockPruningStatistics::new("a", Arc::new(Int32Array::from(vec![1,2,3])), Arc::new(Int32Array::from(vec![Some(10), None, Some(30)])), null_counts, row_counts) - // } - #[test] fn column_pruning_uses_parquet_stats() { // Dummy stats: two containers with constant value 10 and 3 rows each. @@ -88,21 +84,10 @@ mod test { let null_stats = stats.null_stats().expect("null stats"); assert_eq!(null_stats.len(), 2); - let null_counts = null_stats - .null_counts() - .expect("null counts") - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(null_counts, &UInt64Array::from(vec![Some(0), Some(0)])); - - let row_counts = null_stats - .row_counts() - .expect("row counts") - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(row_counts, &UInt64Array::from(vec![Some(3), Some(3)])); + assert_eq!( + null_stats.presence(), + &[NullPresence::NoNull, NullPresence::NoNull] + ); } other => panic!("expected stats, got {other:?}"), }