diff --git a/Cargo.lock b/Cargo.lock index 663c64bf8a789..e9f6c76a88092 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2499,6 +2499,7 @@ dependencies = [ "insta", "itertools 0.14.0", "parking_lot", + "parquet", "paste", "petgraph 0.8.3", "rand 0.9.2", diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 5a7598ea1f299..2540058a3c80b 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -36,6 +36,8 @@ use crate::{ColumnStatistics, ScalarValue}; /// /// 3. Whether the values in a column are contained in a set of literals /// +/// 4. Known value sets for columns, on a per-container basis +/// /// # Vectorized Interface /// /// Information for containers / files are returned as Arrow [`ArrayRef`], so @@ -105,6 +107,19 @@ pub trait PruningStatistics { /// [`UInt64Array`]: arrow::array::UInt64Array fn row_counts(&self, column: &Column) -> Option; + /// Return the known set of values for the named column in each container. + /// + /// The returned vector must contain [`Self::num_containers`] entries. + /// Each entry corresponds to a container and can be: + /// * `Some(HashSet)` if the values for that container are known + /// * `None` if the set is unknown for that container. + /// + /// If set statistics are not available for any container, return `None` + /// (the default). + fn value_sets(&self, _column: &Column) -> Option>>> { + None + } + /// Returns [`BooleanArray`] where each row represents information known /// about specific literal `values` in a column. /// diff --git a/datafusion/expr/src/async_udf.rs b/datafusion/expr/src/async_udf.rs index 8afdfda68dea0..be9df0d57bee9 100644 --- a/datafusion/expr/src/async_udf.rs +++ b/datafusion/expr/src/async_udf.rs @@ -125,6 +125,13 @@ impl ScalarUDFImpl for AsyncScalarUDF { fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { internal_err!("async functions should not be called directly") } + + fn propagate_set_stats( + &self, + child_set_stats: &[crate::SetStats], + ) -> Result> { + self.inner.propagate_set_stats(child_set_stats) + } } impl Display for AsyncScalarUDF { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index b02254bb7d486..aa0bdb08afc1b 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -96,6 +96,7 @@ pub use datafusion_expr_common::signature::{ TIMEZONE_WILDCARD, TypeSignature, TypeSignatureClass, Volatility, }; pub use datafusion_expr_common::type_coercion::binary; +pub use datafusion_physical_expr_common::physical_expr::SetStats; pub use expr::{ Between, BinaryExpr, Case, Cast, Expr, GetFieldAccess, GroupingSet, Like, Sort as SortExpr, TryCast, WindowFunctionDefinition, diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 26d7fc99cb17c..8e4f3b0036aa1 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -22,7 +22,7 @@ use crate::expr::schema_name_from_exprs_comma_separated_without_space; use crate::simplify::{ExprSimplifyResult, SimplifyInfo}; use crate::sort_properties::{ExprProperties, SortProperties}; use crate::udf_eq::UdfEq; -use crate::{ColumnarValue, Documentation, Expr, Signature}; +use crate::{ColumnarValue, Documentation, Expr, SetStats, Signature}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::ConfigOptions; use datafusion_common::{ @@ -321,6 +321,14 @@ impl ScalarUDF { self.inner.propagate_constraints(interval, inputs) } + /// Propagate set statistics through this function. + pub fn propagate_set_stats( + &self, + child_set_stats: &[SetStats], + ) -> Result> { + self.inner.propagate_set_stats(child_set_stats) + } + /// Calculates the [`SortProperties`] of this function based on its /// children's properties. pub fn output_ordering(&self, inputs: &[ExprProperties]) -> Result { @@ -785,6 +793,17 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { Ok(Some(vec![])) } + /// Propagate set statistics from children through this function. + /// + /// Returns `Ok(None)` by default, indicating the function does not support + /// set-stat based pruning. + fn propagate_set_stats( + &self, + _child_set_stats: &[SetStats], + ) -> Result> { + Ok(None) + } + /// Calculates the [`SortProperties`] of this function based on its children's properties. fn output_ordering(&self, inputs: &[ExprProperties]) -> Result { if !self.preserves_lex_ordering(inputs)? { @@ -949,6 +968,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.propagate_constraints(interval, inputs) } + fn propagate_set_stats( + &self, + child_set_stats: &[SetStats], + ) -> Result> { + self.inner.propagate_set_stats(child_set_stats) + } + fn output_ordering(&self, inputs: &[ExprProperties]) -> Result { self.inner.output_ordering(inputs) } diff --git a/datafusion/functions/src/string/upper.rs b/datafusion/functions/src/string/upper.rs index a2a7db1848f59..20220038b083a 100644 --- a/datafusion/functions/src/string/upper.rs +++ b/datafusion/functions/src/string/upper.rs @@ -18,14 +18,15 @@ use crate::string::common::to_upper; use crate::utils::utf8_to_str_type; use arrow::datatypes::DataType; -use datafusion_common::Result; use datafusion_common::types::logical_string; +use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ - Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, - TypeSignatureClass, Volatility, + Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, SetStats, + Signature, TypeSignatureClass, Volatility, }; use datafusion_macros::user_doc; use std::any::Any; +use std::collections::HashSet; #[user_doc( doc_section(label = "String Functions"), @@ -88,6 +89,50 @@ impl ScalarUDFImpl for UpperFunc { to_upper(&args.args, "upper") } + /// For input stat, capitalize the set statistics. + /// + /// Example: + /// Input expression `c` has set stat `{'foo', 'bar'}` + /// This function `upper(c)` will propagate the set stat to `{'FOO', 'BAR'}` + fn propagate_set_stats( + &self, + child_set_stats: &[SetStats], + ) -> Result> { + if child_set_stats.len() != 1 { + return Ok(None); + } + + let input_sets = &child_set_stats[0]; + let mut upper_sets = Vec::with_capacity(input_sets.len()); + + for values in input_sets.value_sets() { + let Some(values) = values else { + upper_sets.push(None); + continue; + }; + + let mut upper_values = HashSet::with_capacity(values.len()); + for value in values { + let upper_value = match value { + ScalarValue::Utf8(v) => { + ScalarValue::Utf8(v.as_ref().map(|s| s.to_uppercase())) + } + ScalarValue::LargeUtf8(v) => { + ScalarValue::LargeUtf8(v.as_ref().map(|s| s.to_uppercase())) + } + ScalarValue::Utf8View(v) => { + ScalarValue::Utf8View(v.as_ref().map(|s| s.to_uppercase())) + } + _ => return Ok(None), + }; + upper_values.insert(upper_value); + } + upper_sets.push(Some(upper_values)); + } + + SetStats::new(upper_sets, input_sets.len()).map(Some) + } + fn documentation(&self) -> Option<&Documentation> { self.doc() } diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 2358a21940912..a25392f330ea8 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -40,6 +40,13 @@ use datafusion_expr_common::statistics::Distribution; use itertools::izip; +mod pruning; + +pub use pruning::{ + ColumnStats, NullPresence, NullStats, PruningContext, PruningIntermediate, + PruningResult, RangeStats, SetStats, +}; + /// Shared [`PhysicalExpr`]. pub type PhysicalExprRef = Arc; @@ -430,6 +437,165 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn is_volatile_node(&self) -> bool { false } + + // --------------- + // Pruning related + // --------------- + // None means propagation not implemented/supported for this node + fn propagate_range_stats( + &self, + _child_range_stats: &[RangeStats], + ) -> Result> { + Ok(None) + } + + fn propagate_null_stats( + &self, + _child_range_stats: &[NullStats], + ) -> Result> { + Ok(None) + } + + fn propagate_set_stats( + &self, + _child_set_stats: &[SetStats], + ) -> Result> { + Ok(None) + } + + fn evaluate_pruning(&self, ctx: Arc) -> Result { + // Default impl for stats-propagation nodes (e.g. arithmetic expressions): + // 1) Evaluate pruning for all children. + // 2) If every child produced range/null stats, propagate them. + // 3) If no stats can be propagated, fall back to `Unsupported`. + let children = self.children(); + if children.is_empty() { + return Ok(PruningIntermediate::empty_stats()); + } + + let mut range_complete = true; + let mut null_complete = true; + let mut set_complete = true; + let mut child_range_stats = Vec::with_capacity(children.len()); + let mut child_null_stats = Vec::with_capacity(children.len()); + let mut child_set_stats = Vec::with_capacity(children.len()); + + for child in children { + match child.evaluate_pruning(Arc::clone(&ctx))? { + PruningIntermediate::IntermediateStats(stats) => { + match stats.range_stats { + Some(range_stats) if range_complete => { + child_range_stats.push(range_stats); + } + _ => { + range_complete = false; + } + } + + match stats.null_stats { + Some(null_stats) if null_complete => { + child_null_stats.push(null_stats); + } + _ => { + null_complete = false; + } + } + + match stats.set_stats { + Some(set_stats) if set_complete => { + child_set_stats.push(set_stats); + } + _ => { + set_complete = false; + } + } + } + // Without node-specific semantics, we can't combine a final pruning result here. + other => return Ok(other), + } + } + + let range_stats = if range_complete && !child_range_stats.is_empty() { + if let Some((first, rest)) = child_range_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Range stats length mismatch between pruning children" + ); + } + } + self.propagate_range_stats(&child_range_stats)? + } else { + None + }; + + let null_stats = if null_complete && !child_null_stats.is_empty() { + if let Some((first, rest)) = child_null_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Null stats length mismatch between pruning children" + ); + } + } + self.propagate_null_stats(&child_null_stats)? + } else { + None + }; + + let set_stats = if set_complete && !child_set_stats.is_empty() { + if let Some((first, rest)) = child_set_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Set stats length mismatch between pruning children" + ); + } + } + self.propagate_set_stats(&child_set_stats)? + } else { + None + }; + + if let (Some(range_stats), Some(null_stats)) = + (range_stats.as_ref(), null_stats.as_ref()) + { + assert_eq_or_internal_err!( + range_stats.len(), + null_stats.len(), + "Range and null stats length mismatch for pruning" + ); + } + + if let (Some(range_stats), Some(set_stats)) = + (range_stats.as_ref(), set_stats.as_ref()) + { + assert_eq_or_internal_err!( + range_stats.len(), + set_stats.len(), + "Range and set stats length mismatch for pruning" + ); + } + + if let (Some(null_stats), Some(set_stats)) = + (null_stats.as_ref(), set_stats.as_ref()) + { + assert_eq_or_internal_err!( + null_stats.len(), + set_stats.len(), + "Null and set stats length mismatch for pruning" + ); + } + + Ok(PruningIntermediate::IntermediateStats(ColumnStats { + range_stats, + null_stats, + set_stats, + })) + } } #[deprecated( diff --git a/datafusion/physical-expr-common/src/physical_expr/pruning.rs b/datafusion/physical-expr-common/src/physical_expr/pruning.rs new file mode 100644 index 0000000000000..442864389ce6f --- /dev/null +++ b/datafusion/physical-expr-common/src/physical_expr/pruning.rs @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef}; +use datafusion_common::pruning::PruningStatistics; +use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err}; + +// Pruner Common +/// e.g. for x > 5 +/// bucket 1 has stat [10,15] -> KeepAll +/// bucket 2 has stat [0,5] -> SkipAll (prune it) +/// bucket 3 has stat [0,10] -> Unknown +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PruningResult { + KeepAll, + SkipAll, + Unknown, +} + +#[derive(Debug, Clone)] +pub enum RangeStats { + Values { + mins: Option, + maxs: Option, + length: usize, + }, + /// Represents a uniform literal value across all containers. + /// This variant make it easy to compare between literals and normal ranges representing + /// each containers' value range. + /// + /// TODO: remove length -- seems redundant + Scalar { value: ScalarValue, length: usize }, +} + +/// Summaries about whether a container has nulls. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NullPresence { + /// Every row in the container is null. + AllNull, + /// No rows in the container are null. + NoNull, + /// Mixed values or insufficient information to decide. + Unknown, +} + +/// Null-related statistics for each container. +#[derive(Debug, Clone)] +pub struct NullStats { + presence: Vec, +} + +#[derive(Debug, Clone)] +pub struct SetStats { + sets: Vec>>, +} + +#[derive(Debug, Clone)] +pub struct ColumnStats { + pub range_stats: Option, + pub null_stats: Option, + pub set_stats: Option, +} + +impl RangeStats { + pub fn new( + mins: Option, + maxs: Option, + length: usize, + ) -> Result { + if let Some(ref mins) = mins { + assert_eq_or_internal_err!( + mins.len(), + length, + "Range mins length mismatch for pruning statistics" + ); + } + if let Some(ref maxs) = maxs { + assert_eq_or_internal_err!( + maxs.len(), + length, + "Range maxs length mismatch for pruning statistics" + ); + } + Ok(Self::Values { mins, maxs, length }) + } + + /// Create range stats for a constant literal across all containers. + /// + pub fn new_scalar(value: ScalarValue, length: usize) -> Result { + Ok(Self::Scalar { value, length }) + } + + pub fn len(&self) -> usize { + match self { + RangeStats::Values { length, .. } | RangeStats::Scalar { length, .. } => { + *length + } + } + } +} + +pub struct PruningContext { + stats: Arc, +} + +impl PruningContext { + pub fn new(stats: Arc) -> Self { + Self { stats } + } + + pub fn statistics(&self) -> &Arc { + &self.stats + } +} + +impl NullStats { + pub fn new( + null_counts: Option, + row_counts: Option, + length: usize, + ) -> Result { + if let Some(ref null_counts) = null_counts { + assert_eq_or_internal_err!( + null_counts.len(), + length, + "Null counts length mismatch for pruning statistics" + ); + } + if let Some(ref row_counts) = row_counts { + assert_eq_or_internal_err!( + row_counts.len(), + length, + "Row counts length mismatch for pruning statistics" + ); + } + + let null_counts = null_counts + .as_ref() + .and_then(|counts| counts.as_any().downcast_ref::()); + let row_counts = row_counts + .as_ref() + .and_then(|counts| counts.as_any().downcast_ref::()); + + let mut presence = Vec::with_capacity(length); + for idx in 0..length { + let nulls = null_counts + .and_then(|counts| (!counts.is_null(idx)).then(|| counts.value(idx))); + let rows = row_counts + .and_then(|counts| (!counts.is_null(idx)).then(|| counts.value(idx))); + + let state = match (nulls, rows) { + (Some(0), Some(_)) => NullPresence::NoNull, + (Some(n), Some(r)) if n == r => NullPresence::AllNull, + (Some(0), None) => NullPresence::NoNull, + _ => NullPresence::Unknown, + }; + + presence.push(state); + } + + Ok(Self { presence }) + } + + pub fn len(&self) -> usize { + self.presence.len() + } + + pub fn presence(&self) -> &[NullPresence] { + &self.presence + } +} + +impl SetStats { + pub fn new(sets: Vec>>, length: usize) -> Result { + assert_eq_or_internal_err!( + sets.len(), + length, + "Set stats length mismatch for pruning statistics" + ); + Ok(Self { sets }) + } + + pub fn len(&self) -> usize { + self.sets.len() + } + + pub fn value_sets(&self) -> &[Option>] { + &self.sets + } +} + +impl ColumnStats { + pub fn new(range_stats: Option, null_stats: Option) -> Self { + Self::new_with_set_stats(range_stats, null_stats, None) + } + + pub fn new_with_set_stats( + range_stats: Option, + null_stats: Option, + set_stats: Option, + ) -> Self { + Self { + range_stats, + null_stats, + set_stats, + } + } + + pub fn range_stats(&self) -> Option<&RangeStats> { + self.range_stats.as_ref() + } + + pub fn null_stats(&self) -> Option<&NullStats> { + self.null_stats.as_ref() + } + + pub fn set_stats(&self) -> Option<&SetStats> { + self.set_stats.as_ref() + } +} + +// TODO: should include length (container count) +#[derive(Debug, Clone)] +pub enum PruningIntermediate { + IntermediateStats(ColumnStats), + IntermediateResult(Vec), +} + +impl PruningIntermediate { + /// Create an `IntermediateStats` variant with no range or null statistics. + pub fn empty_stats() -> Self { + Self::IntermediateStats(ColumnStats::new(None, None)) + } +} diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 1b23beeaa37cc..66720a43c506b 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -68,6 +68,7 @@ datafusion-functions = { workspace = true } insta = { workspace = true } rand = { workspace = true } rstest = { workspace = true } +parquet = { workspace = true } [[bench]] harness = false diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 8df09c22bbd8d..353f83ea74da7 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -29,7 +29,9 @@ use arrow::compute::{SlicesIterator, cast, filter_record_batch}; use arrow::datatypes::*; use arrow::error::ArrowError; use datafusion_common::cast::as_boolean_array; -use datafusion_common::{Result, ScalarValue, internal_err, not_impl_err}; +use datafusion_common::{ + Result, ScalarValue, assert_eq_or_internal_err, internal_err, not_impl_err, +}; use datafusion_expr::binary::BinaryTypeCoercer; use datafusion_expr::interval_arithmetic::{Interval, apply_operator}; use datafusion_expr::sort_properties::ExprProperties; @@ -40,6 +42,10 @@ use datafusion_expr::statistics::{ }; use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr_common::datum::{apply, apply_cmp}; +use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, PruningContext, PruningIntermediate, PruningResult, RangeStats, +}; +use std::cmp::Ordering; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, @@ -297,6 +303,135 @@ impl PhysicalExpr for BinaryExpr { .map(ColumnarValue::Array) } + fn evaluate_pruning(&self, ctx: Arc) -> Result { + use Operator::*; + + if matches!(self.op, And | Or) { + return self.evaluate_logical_pruning(ctx); + } + + let is_cmp = matches!(self.op, Eq | NotEq | Lt | LtEq | Gt | GtEq); + if !is_cmp { + return self.default_pruning(ctx); + } + + let left = self.left.evaluate_pruning(Arc::clone(&ctx))?; + let right = self.right.evaluate_pruning(ctx)?; + + let (left_stats, right_stats) = match (left, right) { + ( + PruningIntermediate::IntermediateStats(ls), + PruningIntermediate::IntermediateStats(rs), + ) => (ls, rs), + (other, PruningIntermediate::IntermediateStats(_)) + | (PruningIntermediate::IntermediateStats(_), other) => return Ok(other), + (l, r) => { + return Ok(if matches!(l, PruningIntermediate::IntermediateResult(_)) { + l + } else { + r + }); + } + }; + + let l_range = range_bounds(left_stats.range_stats.as_ref()); + let r_range = range_bounds(right_stats.range_stats.as_ref()); + + let (l_range, r_range) = match (l_range, r_range) { + (Some(l), Some(r)) if l.len() == r.len() && !l.is_empty() => (l, r), + (Some(l), Some(r)) if l.is_empty() && r.is_empty() => { + match ( + left_stats.range_stats.as_ref(), + right_stats.range_stats.as_ref(), + ) { + ( + Some(RangeStats::Scalar { value: lv, .. }), + Some(RangeStats::Scalar { value: rv, .. }), + ) => { + let res = compare_ranges( + &self.op, + &(Some(lv.clone()), Some(lv.clone())), + &(Some(rv.clone()), Some(rv.clone())), + ) + .unwrap_or(false); + let pruning = if res { + PruningResult::KeepAll + } else { + PruningResult::SkipAll + }; + return Ok(PruningIntermediate::IntermediateResult(vec![ + pruning, + ])); + } + _ => { + return Ok(PruningIntermediate::IntermediateResult(vec![ + PruningResult::Unknown, + ])); + } + } + } + _ => { + return Ok(PruningIntermediate::IntermediateResult(vec![ + PruningResult::Unknown, + ])); + } + }; + + let mut results = Vec::with_capacity(l_range.len()); + for (l, r) in l_range.into_iter().zip(r_range.into_iter()) { + let res = match compare_ranges(&self.op, &l, &r) { + Some(true) => PruningResult::KeepAll, + Some(false) => PruningResult::SkipAll, + None => PruningResult::Unknown, + }; + results.push(res); + } + + Ok(PruningIntermediate::IntermediateResult(results)) + } + + fn propagate_range_stats( + &self, + child_range_stats: &[RangeStats], + ) -> Result> { + use Operator::Multiply; + + match self.op { + Multiply => match (&child_range_stats[0], &child_range_stats[1]) { + ( + RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + length, + }, + RangeStats::Scalar { value: scalar, .. }, + ) => multiply_range_by_scalar(mins, maxs, *length, scalar), + ( + RangeStats::Scalar { + value: scalar, + length, + }, + RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + .. + }, + ) => multiply_range_by_scalar(mins, maxs, *length, scalar), + _ => unimplemented!( + "Range propagation for * operator on children {:?} and {:?} has not been implemented yet", + child_range_stats[0], + child_range_stats[1] + ), + }, + _ => { + unimplemented!( + "Range propagation for {} has not been implemented", + self.op + ) + } + } + } + fn children(&self) -> Vec<&Arc> { vec![&self.left, &self.right] } @@ -746,6 +881,356 @@ fn check_short_circuit<'a>( ShortCircuitStrategy::None } +fn multiply_range_by_scalar( + mins: &ArrayRef, + maxs: &ArrayRef, + length: usize, + scalar: &ScalarValue, +) -> Result> { + let result_type = BinaryTypeCoercer::new( + mins.data_type(), + &Operator::Multiply, + &scalar.data_type(), + ) + .get_result_type()?; + + let mins = match cast(mins.as_ref(), &result_type) { + Ok(arr) => arr, + Err(_) => return Ok(None), + }; + let maxs = match cast(maxs.as_ref(), &result_type) { + Ok(arr) => arr, + Err(_) => return Ok(None), + }; + let scalar = scalar.cast_to(&result_type)?; + + let mut out_mins = Vec::with_capacity(length); + let mut out_maxs = Vec::with_capacity(length); + + for idx in 0..length { + let min = ScalarValue::try_from_array(mins.as_ref(), idx)?; + let max = ScalarValue::try_from_array(maxs.as_ref(), idx)?; + + let p1 = min.mul_checked(&scalar)?; + let p2 = max.mul_checked(&scalar)?; + + let (low, high) = match p1.partial_cmp(&p2) { + Some(Ordering::Greater) => (p2, p1), + Some(_) => (p1, p2), + None => return Ok(None), + }; + out_mins.push(low); + out_maxs.push(high); + } + + let mins = ScalarValue::iter_to_array(out_mins.into_iter())?; + let maxs = ScalarValue::iter_to_array(out_maxs.into_iter())?; + RangeStats::new(Some(mins), Some(maxs), length).map(Some) +} + +fn range_bounds( + stats: Option<&RangeStats>, +) -> Option, Option)>> { + match stats? { + RangeStats::Values { mins, maxs, length } => { + let mins = mins.as_ref()?; + let maxs = maxs.as_ref()?; + let mut out = Vec::with_capacity(*length); + for i in 0..*length { + let min = ScalarValue::try_from_array(mins, i).ok(); + let max = ScalarValue::try_from_array(maxs, i).ok(); + out.push((min, max)); + } + Some(out) + } + RangeStats::Scalar { value, length } => Some( + (0..*length) + .map(|_| (Some(value.clone()), Some(value.clone()))) + .collect(), + ), + } +} + +fn compare_ranges( + op: &Operator, + left: &(Option, Option), + right: &(Option, Option), +) -> Option { + use Operator::*; + let (lmin, lmax) = left; + let (rmin, rmax) = right; + + let ord_lmin_rmax = lmin + .as_ref() + .zip(rmax.as_ref()) + .and_then(|(l, r)| l.partial_cmp(r)); + let ord_lmax_rmin = lmax + .as_ref() + .zip(rmin.as_ref()) + .and_then(|(l, r)| l.partial_cmp(r)); + + match op { + Gt => { + if matches!(ord_lmin_rmax, Some(Ordering::Greater)) { + Some(true) + } else if matches!(ord_lmax_rmin, Some(Ordering::Less | Ordering::Equal)) { + Some(false) + } else { + None + } + } + GtEq => { + if matches!(ord_lmin_rmax, Some(Ordering::Greater | Ordering::Equal)) { + Some(true) + } else if matches!(ord_lmax_rmin, Some(Ordering::Less)) { + Some(false) + } else { + None + } + } + Lt => { + if matches!(ord_lmax_rmin, Some(Ordering::Less)) { + Some(true) + } else if matches!(ord_lmin_rmax, Some(Ordering::Greater | Ordering::Equal)) { + Some(false) + } else { + None + } + } + LtEq => { + if matches!(ord_lmax_rmin, Some(Ordering::Less | Ordering::Equal)) { + Some(true) + } else if matches!(ord_lmin_rmax, Some(Ordering::Greater)) { + Some(false) + } else { + None + } + } + Eq => { + if let (Some(lmin), Some(lmax), Some(rmin), Some(rmax)) = + (lmin.as_ref(), lmax.as_ref(), rmin.as_ref(), rmax.as_ref()) + { + if lmin == lmax && rmin == rmax && lmin == rmin { + return Some(true); + } + if matches!(lmax.partial_cmp(rmin), Some(Ordering::Less)) + || matches!(rmax.partial_cmp(lmin), Some(Ordering::Less)) + { + return Some(false); + } + } + None + } + NotEq => { + if let (Some(lmin), Some(lmax), Some(rmin), Some(rmax)) = + (lmin.as_ref(), lmax.as_ref(), rmin.as_ref(), rmax.as_ref()) + { + if lmin == lmax && rmin == rmax && lmin == rmin { + return Some(false); + } + if matches!(lmax.partial_cmp(rmin), Some(Ordering::Less)) + || matches!(rmax.partial_cmp(lmin), Some(Ordering::Less)) + { + return Some(true); + } + } + None + } + _ => None, + } +} + +fn stats_len(stats: &ColumnStats) -> Option { + stats + .range_stats() + .map(|s| s.len()) + .or_else(|| stats.null_stats().map(|s| s.len())) + .or_else(|| stats.set_stats().map(|s| s.len())) +} + +fn pruning_to_results(intermediate: &PruningIntermediate) -> Option> { + match intermediate { + PruningIntermediate::IntermediateResult(results) => Some(results.clone()), + PruningIntermediate::IntermediateStats(stats) => { + stats_len(stats).map(|len| vec![PruningResult::Unknown; len]) + } + } +} + +impl BinaryExpr { + fn evaluate_logical_pruning( + &self, + ctx: Arc, + ) -> Result { + use PruningResult::*; + + let left = self.left.evaluate_pruning(Arc::clone(&ctx))?; + let right = self.right.evaluate_pruning(ctx)?; + + let left_results = pruning_to_results(&left); + let right_results = pruning_to_results(&right); + + match (left_results, right_results) { + (Some(mut l), Some(r)) => { + assert_eq_or_internal_err!( + l.len(), + r.len(), + "Logical pruning inputs have mismatched lengths" + ); + + for (lres, rres) in l.iter_mut().zip(r.into_iter()) { + *lres = match self.op { + Operator::And => match (*lres, rres) { + (SkipAll, _) | (_, SkipAll) => SkipAll, + (KeepAll, KeepAll) => KeepAll, + _ => Unknown, + }, + Operator::Or => match (*lres, rres) { + (KeepAll, _) | (_, KeepAll) => KeepAll, + (SkipAll, SkipAll) => SkipAll, + _ => Unknown, + }, + _ => unreachable!("Logical pruning only handles AND/OR"), + }; + } + + Ok(PruningIntermediate::IntermediateResult(l)) + } + _ => Ok(PruningIntermediate::IntermediateResult(vec![Unknown])), + } + } + + #[allow(dead_code)] + fn default_pruning(&self, ctx: Arc) -> Result { + let children = self.children(); + if children.is_empty() { + return Ok(PruningIntermediate::empty_stats()); + } + + let mut range_complete = true; + let mut null_complete = true; + let mut set_complete = true; + let mut child_range_stats = Vec::with_capacity(children.len()); + let mut child_null_stats = Vec::with_capacity(children.len()); + let mut child_set_stats = Vec::with_capacity(children.len()); + + for child in children { + match child.evaluate_pruning(Arc::clone(&ctx))? { + PruningIntermediate::IntermediateStats(stats) => { + match stats.range_stats { + Some(range_stats) if range_complete => { + child_range_stats.push(range_stats); + } + _ => { + range_complete = false; + } + } + + match stats.null_stats { + Some(null_stats) if null_complete => { + child_null_stats.push(null_stats); + } + _ => { + null_complete = false; + } + } + + match stats.set_stats { + Some(set_stats) if set_complete => { + child_set_stats.push(set_stats); + } + _ => { + set_complete = false; + } + } + } + other => return Ok(other), + } + } + + let range_stats = if range_complete && !child_range_stats.is_empty() { + if let Some((first, rest)) = child_range_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Range stats length mismatch between pruning children" + ); + } + } + self.propagate_range_stats(&child_range_stats)? + } else { + None + }; + + let null_stats = if null_complete && !child_null_stats.is_empty() { + if let Some((first, rest)) = child_null_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Null stats length mismatch between pruning children" + ); + } + } + self.propagate_null_stats(&child_null_stats)? + } else { + None + }; + + let set_stats = if set_complete && !child_set_stats.is_empty() { + if let Some((first, rest)) = child_set_stats.split_first() { + for stats in rest { + assert_eq_or_internal_err!( + first.len(), + stats.len(), + "Set stats length mismatch between pruning children" + ); + } + } + self.propagate_set_stats(&child_set_stats)? + } else { + None + }; + + if let (Some(range_stats), Some(null_stats)) = + (range_stats.as_ref(), null_stats.as_ref()) + { + assert_eq_or_internal_err!( + range_stats.len(), + null_stats.len(), + "Range and null stats length mismatch for pruning" + ); + } + + if let (Some(range_stats), Some(set_stats)) = + (range_stats.as_ref(), set_stats.as_ref()) + { + assert_eq_or_internal_err!( + range_stats.len(), + set_stats.len(), + "Range and set stats length mismatch for pruning" + ); + } + + if let (Some(null_stats), Some(set_stats)) = + (null_stats.as_ref(), set_stats.as_ref()) + { + assert_eq_or_internal_err!( + null_stats.len(), + set_stats.len(), + "Null and set stats length mismatch for pruning" + ); + } + + Ok(PruningIntermediate::IntermediateStats(ColumnStats { + range_stats, + null_stats, + set_stats, + })) + } +} + /// Creates a new boolean array based on the evaluation of the right expression, /// but only for positions where the left_result is true. /// diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 8c7e8c319fff4..d29c52c611130 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -28,8 +28,11 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{Result, internal_err, plan_err}; +use datafusion_common::{Column as DFColumn, Result, internal_err, plan_err}; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, NullStats, PruningContext, PruningIntermediate, RangeStats, SetStats, +}; /// Represents the column at a given index in a RecordBatch /// @@ -143,6 +146,39 @@ impl PhysicalExpr for Column { Ok(self) } + /// Read the column statistics from `PruningContext`, transform the input stat + /// to the stats in the internal representation (`PruningIntermediate`) + fn evaluate_pruning(&self, ctx: Arc) -> Result { + let pruning_column = DFColumn::from_name(self.name()); + let stats = ctx.statistics(); + let num_containers = stats.num_containers(); + + let min_values = stats.min_values(&pruning_column); + let max_values = stats.max_values(&pruning_column); + let range_stats = if min_values.is_some() || max_values.is_some() { + Some(RangeStats::new(min_values, max_values, num_containers)?) + } else { + None + }; + + let null_counts = stats.null_counts(&pruning_column); + let row_counts = stats.row_counts(&pruning_column); + let null_stats = if null_counts.is_some() || row_counts.is_some() { + Some(NullStats::new(null_counts, row_counts, num_containers)?) + } else { + None + }; + + let set_stats = stats + .value_sets(&pruning_column) + .map(|sets| SetStats::new(sets, num_containers)) + .transpose()?; + + Ok(PruningIntermediate::IntermediateStats( + ColumnStats::new_with_set_stats(range_stats, null_stats, set_stats), + )) + } + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name) } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index b6b67c85c4881..e946940d41688 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -18,6 +18,7 @@ //! Implementation of `InList` expressions: [`InListExpr`] use std::any::Any; +use std::collections::HashSet as StdHashSet; use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -37,6 +38,9 @@ use datafusion_common::{ exec_err, }; use datafusion_expr::{ColumnarValue, expr_vec_fmt}; +use datafusion_physical_expr_common::physical_expr::{ + PruningContext, PruningIntermediate, PruningResult, RangeStats, +}; use ahash::RandomState; use datafusion_common::HashMap; @@ -682,6 +686,29 @@ impl InListExpr { Ok(Self::new(expr, list, negated, static_filter)) } + + fn list_value_set( + &self, + ctx: &Arc, + ) -> Result>> { + let mut values = StdHashSet::new(); + for expr in &self.list { + match expr.evaluate_pruning(Arc::clone(ctx))? { + PruningIntermediate::IntermediateStats(stats) => { + match stats.range_stats { + Some(RangeStats::Scalar { value, .. }) => { + if !value.is_null() { + values.insert(value); + } + } + _ => return Ok(None), + } + } + _ => return Ok(None), + } + } + Ok(Some(values)) + } } impl std::fmt::Display for InListExpr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -828,6 +855,61 @@ impl PhysicalExpr for InListExpr { Ok(ColumnarValue::Array(Arc::new(r))) } + fn evaluate_pruning(&self, ctx: Arc) -> Result { + let left = self.expr.evaluate_pruning(Arc::clone(&ctx))?; + let stats = match left { + PruningIntermediate::IntermediateStats(stats) => stats, + _ => unreachable!("Input must have IntermediateStats type"), + }; + + let container_len = stats + .set_stats() + .map(|s| s.len()) + .or_else(|| stats.range_stats().map(|s| s.len())) + .or_else(|| stats.null_stats().map(|s| s.len())) + .unwrap_or(1); + + let Some(list_values) = self.list_value_set(&ctx)? else { + return Ok(PruningIntermediate::IntermediateResult(vec![ + PruningResult::Unknown; + container_len + ])); + }; + + let Some(set_stats) = stats.set_stats() else { + return Ok(PruningIntermediate::IntermediateResult(vec![ + PruningResult::Unknown; + container_len + ])); + }; + + let mut results = Vec::with_capacity(set_stats.len()); + for container_values in set_stats.value_sets() { + let res = match container_values { + None => PruningResult::Unknown, + Some(values) => match values.is_empty() + || values.iter().any(|v| v.is_null()) + { + true => PruningResult::Unknown, + false => { + let is_subset = values.iter().all(|v| list_values.contains(v)); + let is_disjoint = values.iter().all(|v| !list_values.contains(v)); + match (is_subset, is_disjoint, self.negated) { + (true, _, false) => PruningResult::KeepAll, + (true, _, true) => PruningResult::SkipAll, + (_, true, false) => PruningResult::SkipAll, + (_, true, true) => PruningResult::KeepAll, + _ => PruningResult::Unknown, + } + } + }, + }; + results.push(res); + } + + Ok(PruningIntermediate::IntermediateResult(results)) + } + fn children(&self) -> Vec<&Arc> { let mut children = vec![&self.expr]; children.extend(&self.list); diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 356fe2a866672..6bf7b985d12c4 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -25,6 +25,9 @@ use arrow::{ use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::{ + NullPresence, PruningContext, PruningIntermediate, +}; use std::hash::Hash; use std::{any::Any, sync::Arc}; @@ -92,6 +95,31 @@ impl PhysicalExpr for IsNullExpr { } } + fn evaluate_pruning(&self, ctx: Arc) -> Result { + use datafusion_physical_expr_common::physical_expr::PruningResult::*; + + let child = self.arg.evaluate_pruning(ctx)?; + match child { + PruningIntermediate::IntermediateStats(stats) => { + if let Some(null_stats) = stats.null_stats() { + let results = null_stats + .presence() + .iter() + .map(|presence| match presence { + NullPresence::AllNull => KeepAll, + NullPresence::NoNull => SkipAll, + NullPresence::Unknown => Unknown, + }) + .collect(); + + return Ok(PruningIntermediate::IntermediateResult(results)); + } + Ok(PruningIntermediate::IntermediateResult(vec![Unknown])) + } + other => Ok(other), + } + } + fn children(&self) -> Vec<&Arc> { vec![&self.arg] } diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 1f3fefc60b7ad..49b12d411b8c0 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -18,6 +18,7 @@ //! Literal expressions for physical operations use std::any::Any; +use std::collections::HashSet; use std::hash::Hash; use std::sync::Arc; @@ -34,6 +35,9 @@ use datafusion_expr::Expr; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties}; +use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, PruningContext, PruningIntermediate, RangeStats, SetStats, +}; /// Represents a literal value #[derive(Debug, PartialEq, Eq, Clone)] @@ -112,6 +116,23 @@ impl PhysicalExpr for Literal { Ok(ColumnarValue::Scalar(self.value.clone())) } + fn evaluate_pruning(&self, ctx: Arc) -> Result { + let length = ctx.statistics().num_containers(); + let range = RangeStats::new_scalar(self.value.clone(), length)?; + let set_stats = if self.value.is_null() { + None + } else { + let mut set = HashSet::new(); + set.insert(self.value.clone()); + let sets = vec![Some(set); length]; + Some(SetStats::new(sets, length)?) + }; + + Ok(PruningIntermediate::IntermediateStats( + ColumnStats::new_with_set_stats(Some(range), None, set_stats), + )) + } + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index e6a6db75bebd7..182fda5474036 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -48,6 +48,7 @@ use datafusion_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, Volatility, expr_vec_fmt, }; +use datafusion_physical_expr_common::physical_expr::SetStats; /// Physical expression of a scalar function pub struct ScalarFunctionExpr { @@ -336,6 +337,13 @@ impl PhysicalExpr for ScalarFunctionExpr { self.fun.propagate_constraints(interval, children) } + fn propagate_set_stats( + &self, + child_set_stats: &[SetStats], + ) -> Result> { + self.fun.propagate_set_stats(child_set_stats) + } + fn get_properties(&self, children: &[ExprProperties]) -> Result { let sort_properties = self.fun.output_ordering(children)?; let preserves_lex_ordering = self.fun.preserves_lex_ordering(children)?; diff --git a/datafusion/physical-expr/tests/pruning.rs b/datafusion/physical-expr/tests/pruning.rs new file mode 100644 index 0000000000000..4b44fe21d12cc --- /dev/null +++ b/datafusion/physical-expr/tests/pruning.rs @@ -0,0 +1,634 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod pruning_utils; + +mod test { + use std::collections::HashSet; + use std::sync::Arc; + + use arrow::array::{ArrayRef, BooleanArray, Int32Array, Int64Array, UInt64Array}; + use arrow::datatypes::{DataType, Field, FieldRef, Schema}; + use datafusion_common::ScalarValue; + use datafusion_common::config::ConfigOptions; + use datafusion_common::pruning::PruningStatistics; + use datafusion_expr::Operator; + use datafusion_functions::string; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::ScalarFunctionExpr; + use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, in_list, is_null, lit, + }; + use datafusion_physical_expr_common::physical_expr::{ + ColumnStats, NullPresence, PruningContext, PruningIntermediate, PruningResult, + RangeStats, + }; + + use crate::pruning_utils::{ + DummyStats, MockPruningStatistics, MultiColumnPruningStatistics, + }; + + #[test] + fn column_pruning_uses_parquet_stats() { + // Dummy stats: two containers with constant value 10 and 3 rows each. + let pruning_stats = Arc::new(MockPruningStatistics::from_scalar( + "a", + ScalarValue::Int32(Some(10)), + 2, + 3, + )); + + let context = Arc::new(PruningContext::new(pruning_stats)); + let column_expr = Column::new("a", 0); + + match column_expr.evaluate_pruning(context).unwrap() { + PruningIntermediate::IntermediateStats(stats) => { + let range_stats = stats.range_stats().expect("range stats"); + assert_eq!(range_stats.len(), 2); + + let (mins, maxs) = match range_stats { + RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + .. + } => (mins.clone(), maxs.clone()), + RangeStats::Scalar { value, length } => { + let arr = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(*length), + ) + .unwrap(); + (arr.clone(), arr) + } + _ => panic!("missing min/max stats"), + }; + + let mins = mins.as_any().downcast_ref::().unwrap(); + let maxs = maxs.as_any().downcast_ref::().unwrap(); + assert_eq!(mins, &Int32Array::from(vec![Some(10), Some(10)])); + assert_eq!(maxs, &Int32Array::from(vec![Some(10), Some(10)])); + + let null_stats = stats.null_stats().expect("null stats"); + assert_eq!(null_stats.len(), 2); + + assert_eq!( + null_stats.presence(), + &[NullPresence::NoNull, NullPresence::NoNull] + ); + } + other => panic!("expected stats, got {other:?}"), + } + } + + #[test] + fn column_set_stats_present() { + let mins = + ScalarValue::iter_to_array(vec![ScalarValue::Utf8(Some("foo".to_string()))]) + .unwrap(); + let maxs = + ScalarValue::iter_to_array(vec![ScalarValue::Utf8(Some("foo".to_string()))]) + .unwrap(); + let null_counts = + ScalarValue::iter_to_array(vec![ScalarValue::UInt64(Some(0))]).unwrap(); + let row_counts = + ScalarValue::iter_to_array(vec![ScalarValue::UInt64(Some(1))]).unwrap(); + + let mut set = HashSet::new(); + set.insert(ScalarValue::Utf8(Some("foo".to_string()))); + set.insert(ScalarValue::Utf8(Some("bar".to_string()))); + + let pruning_stats = Arc::new(MockPruningStatistics::new_with_sets( + "c", + Some(mins), + Some(maxs), + Some(null_counts), + Some(row_counts), + Some(vec![Some(set.clone())]), + )); + + let context = Arc::new(PruningContext::new(pruning_stats)); + let column_expr = Column::new("c", 0); + + match column_expr.evaluate_pruning(context).unwrap() { + PruningIntermediate::IntermediateStats(stats) => { + let set_stats = stats.set_stats().expect("set stats"); + assert_eq!(set_stats.len(), 1); + let container_set = set_stats.value_sets()[0] + .as_ref() + .expect("value set present"); + assert_eq!(container_set, &set); + } + other => panic!("expected stats, got {other:?}"), + } + } + + #[derive(Clone)] + struct LenOnlyStats(usize); + + impl PruningStatistics for LenOnlyStats { + fn min_values(&self, _: &datafusion_common::Column) -> Option { + None + } + + fn max_values(&self, _: &datafusion_common::Column) -> Option { + None + } + + fn num_containers(&self) -> usize { + self.0 + } + + fn null_counts(&self, _: &datafusion_common::Column) -> Option { + None + } + + fn row_counts(&self, _: &datafusion_common::Column) -> Option { + None + } + + fn contained( + &self, + _: &datafusion_common::Column, + _: &HashSet, + ) -> Option { + None + } + } + + #[test] + fn lit_basic() { + let lit_expr = lit(5); + let ctx = Arc::new(PruningContext::new(Arc::new(DummyStats))); + let stat = lit_expr.evaluate_pruning(ctx).expect("pruning ok"); + + match stat { + PruningIntermediate::IntermediateStats(ColumnStats { + range_stats: Some(RangeStats::Scalar { value, length }), + null_stats, + set_stats, + }) => { + assert_eq!(value, ScalarValue::Int32(Some(5))); + assert_eq!(length, 0); + assert!(null_stats.is_none()); + let set_stats = set_stats.expect("set stats"); + assert_eq!(set_stats.len(), 0); + } + other => panic!("unexpected pruning result: {other:?}"), + } + } + + #[test] + fn literal_set_stats_present() { + let lit_expr = lit("foo"); + let ctx = Arc::new(PruningContext::new(Arc::new(LenOnlyStats(2)))); + let stat = lit_expr.evaluate_pruning(ctx).expect("pruning ok"); + + match stat { + PruningIntermediate::IntermediateStats(ColumnStats { set_stats, .. }) => { + let set_stats = set_stats.expect("set stats"); + assert_eq!(set_stats.len(), 2); + for values in set_stats.value_sets() { + let set = values.as_ref().expect("value set"); + assert_eq!(set.len(), 1); + assert!(set.contains(&ScalarValue::Utf8(Some("foo".to_string())))); + } + } + other => panic!("unexpected pruning result: {other:?}"), + } + } + + #[test] + fn range_stats_scalar_variant() { + let stats = RangeStats::new_scalar(ScalarValue::Int64(Some(42)), 3).unwrap(); + assert_eq!(stats.len(), 3); + + let (mins, maxs) = match &stats { + RangeStats::Scalar { value, length } => { + let arr = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(*length), + ) + .unwrap(); + (arr.clone(), arr) + } + RangeStats::Values { mins, maxs, .. } => { + (mins.clone().expect("mins"), maxs.clone().expect("maxs")) + } + }; + + let mins = mins.as_any().downcast_ref::().unwrap(); + let maxs = maxs.as_any().downcast_ref::().unwrap(); + assert_eq!(mins, &Int64Array::from(vec![Some(42), Some(42), Some(42)])); + assert_eq!(maxs, &Int64Array::from(vec![Some(42), Some(42), Some(42)])); + } + + #[test] + fn compare_literal_literal_prunes() { + let expr = BinaryExpr::new(lit(5), Operator::Gt, lit(3)); + let ctx = Arc::new(PruningContext::new(Arc::new(DummyStats))); + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::KeepAll]) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn compare_column_literal_prunes() { + let stats = Arc::new(MockPruningStatistics::from_scalar( + "a", + ScalarValue::Int32(Some(10)), + 2, + 3, + )); + let ctx = Arc::new(PruningContext::new(stats)); + let expr = BinaryExpr::new(Arc::new(Column::new("a", 0)), Operator::Lt, lit(5)); + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::SkipAll; 2]) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn compare_column_ranges_prunes() { + // All ranges strictly greater than the literal -> AlwaysTrue + let mins = Arc::new(Int32Array::from(vec![Some(10), Some(8)])); + let maxs = Arc::new(Int32Array::from(vec![Some(20), Some(12)])); + let zeros = Arc::new(UInt64Array::from(vec![Some(0), Some(0)])); + let rows = Arc::new(UInt64Array::from(vec![Some(3), Some(3)])); + let stats = Arc::new(MockPruningStatistics::new( + "a", + Some(mins.clone()), + Some(maxs.clone()), + Some(zeros.clone()), + Some(rows.clone()), + )); + let ctx = Arc::new(PruningContext::new(stats)); + let expr = BinaryExpr::new(Arc::new(Column::new("a", 0)), Operator::Gt, lit(5)); + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::KeepAll; 2]) + } + other => panic!("unexpected result: {other:?}"), + } + + // Mixed range that overlaps the literal -> Unknown + let mins = Arc::new(Int32Array::from(vec![Some(1), Some(2)])); + let maxs = Arc::new(Int32Array::from(vec![Some(9), Some(4)])); + let stats = Arc::new(MockPruningStatistics::new( + "a", + Some(mins), + Some(maxs), + Some(zeros), + Some(rows), + )); + let ctx = Arc::new(PruningContext::new(stats)); + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!( + results, + vec![PruningResult::Unknown, PruningResult::SkipAll] + ) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn multiply_range_prunes() { + let mins = Arc::new(Int32Array::from(vec![Some(10)])); + let maxs = Arc::new(Int32Array::from(vec![Some(20)])); + let zeros = Arc::new(UInt64Array::from(vec![Some(0)])); + let rows = Arc::new(UInt64Array::from(vec![Some(1)])); + let stats = Arc::new(MockPruningStatistics::new( + "a", + Some(mins), + Some(maxs), + Some(zeros), + Some(rows), + )); + let ctx = Arc::new(PruningContext::new(stats)); + + let mult_expr = + BinaryExpr::new(Arc::new(Column::new("a", 0)), Operator::Multiply, lit(7)); + let mult_stats = mult_expr + .evaluate_pruning(Arc::clone(&ctx)) + .expect("multiplication pruning ok"); + match mult_stats { + PruningIntermediate::IntermediateStats(ColumnStats { + range_stats: + Some(RangeStats::Values { + mins: Some(mins), + maxs: Some(maxs), + length, + }), + null_stats, + set_stats, + }) => { + assert_eq!(length, 1); + let min_scalar = + ScalarValue::try_from_array(mins.as_ref(), 0).expect("min scalar"); + let max_scalar = + ScalarValue::try_from_array(maxs.as_ref(), 0).expect("max scalar"); + let min_scalar = min_scalar.cast_to(&DataType::Int64).unwrap(); + let max_scalar = max_scalar.cast_to(&DataType::Int64).unwrap(); + assert_eq!(min_scalar, ScalarValue::Int64(Some(70))); + assert_eq!(max_scalar, ScalarValue::Int64(Some(140))); + assert!(null_stats.is_none()); + assert!(set_stats.is_none()); + } + other => panic!("unexpected pruning result: {other:?}"), + } + + // (a*7) < 10 + let expr = BinaryExpr::new(Arc::new(mult_expr), Operator::Lt, lit(10)); + + let res = expr.evaluate_pruning(ctx).unwrap(); + match res { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::SkipAll]) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn is_null_prunes_when_no_nulls() { + let stats = Arc::new(MockPruningStatistics::from_scalar( + "b", + ScalarValue::Int32(Some(1)), + 1, + 3, + )); + let ctx = Arc::new(PruningContext::new(stats)); + let expr = is_null(Arc::new(Column::new("b", 0))).unwrap(); + + match expr.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::SkipAll]) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn in_list_literal_set_prunes() { + let ctx = Arc::new(PruningContext::new(Arc::new(LenOnlyStats(1)))); + let schema = Schema::new(Vec::::new()); + let expr = + in_list(lit("foo"), vec![lit("foo"), lit("bar")], &false, &schema).unwrap(); + + match expr.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::KeepAll]) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn in_list_prunes_with_set_stats() { + let mins = + ScalarValue::iter_to_array(vec![ScalarValue::Utf8(Some("foo".to_string()))]) + .unwrap(); + let maxs = + ScalarValue::iter_to_array(vec![ScalarValue::Utf8(Some("foo".to_string()))]) + .unwrap(); + let null_counts = + ScalarValue::iter_to_array(vec![ScalarValue::UInt64(Some(0))]).unwrap(); + + let mut value_set = HashSet::new(); + value_set.insert(ScalarValue::Utf8(Some("foo".to_string()))); + value_set.insert(ScalarValue::Utf8(Some("bar".to_string()))); + + let stats = Arc::new(MockPruningStatistics::new_with_sets( + "c", + Some(mins), + Some(maxs), + Some(null_counts), + None, + Some(vec![Some(value_set)]), + )); + + let ctx = Arc::new(PruningContext::new(stats)); + let schema = Schema::new(vec![Field::new("c", DataType::Utf8, true)]); + let expr = in_list( + Arc::new(Column::new("c", 0)), + vec![lit("toy"), lit("book")], + &false, + &schema, + ) + .unwrap(); + + match expr.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!(results, vec![PruningResult::SkipAll]) + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn in_list_prunes_upper_with_set_stats() { + let mins = ScalarValue::iter_to_array(vec![ + ScalarValue::Utf8(Some("electronic".to_string())), + ScalarValue::Utf8(Some("chair".to_string())), + ScalarValue::Utf8(Some("book".to_string())), + ]) + .unwrap(); + let maxs = ScalarValue::iter_to_array(vec![ + ScalarValue::Utf8(Some("electronic".to_string())), + ScalarValue::Utf8(Some("chair".to_string())), + ScalarValue::Utf8(Some("pencil".to_string())), + ]) + .unwrap(); + let null_counts = ScalarValue::iter_to_array(vec![ + ScalarValue::UInt64(Some(0)), + ScalarValue::UInt64(Some(0)), + ScalarValue::UInt64(Some(0)), + ]) + .unwrap(); + + let mut set_true = HashSet::new(); + set_true.insert(ScalarValue::Utf8(Some("electronic".to_string()))); + + let mut set_false = HashSet::new(); + set_false.insert(ScalarValue::Utf8(Some("chair".to_string()))); + + let mut set_unknown = HashSet::new(); + set_unknown.insert(ScalarValue::Utf8(Some("book".to_string()))); + set_unknown.insert(ScalarValue::Utf8(Some("pencil".to_string()))); + + let stats = Arc::new(MockPruningStatistics::new_with_sets( + "c", + Some(mins), + Some(maxs), + Some(null_counts), + None, + Some(vec![Some(set_true), Some(set_false), Some(set_unknown)]), + )); + + let ctx = Arc::new(PruningContext::new(stats)); + let schema = Schema::new(vec![Field::new("c", DataType::Utf8, true)]); + let upper_udf = string::upper(); + let upper_expr = ScalarFunctionExpr::try_new( + upper_udf, + vec![Arc::new(Column::new("c", 0)) as Arc], + &schema, + Arc::new(ConfigOptions::new()), + ) + .unwrap(); + + let expr = in_list( + Arc::new(upper_expr), + vec![lit("ELECTRONIC"), lit("BOOK")], + &false, + &schema, + ) + .unwrap(); + + match expr.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!( + results, + vec![ + PruningResult::KeepAll, + PruningResult::SkipAll, + PruningResult::Unknown + ] + ); + } + other => panic!("unexpected result: {other:?}"), + } + } + + #[test] + fn logical_or_prunes_combined_predicate() { + let num_containers = 3; + let mut stats = MultiColumnPruningStatistics::new(num_containers); + + // a ranges + stats.mins.insert( + "a".to_string(), + ScalarValue::iter_to_array(vec![ + ScalarValue::Int32(Some(2)), + ScalarValue::Int32(Some(0)), + ScalarValue::Int32(Some(0)), + ]) + .unwrap(), + ); + stats.maxs.insert( + "a".to_string(), + ScalarValue::iter_to_array(vec![ + ScalarValue::Int32(Some(3)), + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(3)), + ]) + .unwrap(), + ); + + // b null counts + stats.null_counts.insert( + "b".to_string(), + ScalarValue::iter_to_array(vec![ + ScalarValue::UInt64(Some(0)), + ScalarValue::UInt64(Some(5)), + ScalarValue::UInt64(Some(1)), + ]) + .unwrap(), + ); + stats.row_counts.insert( + "b".to_string(), + ScalarValue::iter_to_array(vec![ + ScalarValue::UInt64(Some(5)), + ScalarValue::UInt64(Some(5)), + ScalarValue::UInt64(Some(5)), + ]) + .unwrap(), + ); + + // c value sets + let mut set_true = HashSet::new(); + set_true.insert(ScalarValue::Utf8(Some("electronic".to_string()))); + + let mut set_false = HashSet::new(); + set_false.insert(ScalarValue::Utf8(Some("chair".to_string()))); + + let mut set_unknown = HashSet::new(); + set_unknown.insert(ScalarValue::Utf8(Some("book".to_string()))); + set_unknown.insert(ScalarValue::Utf8(Some("pencil".to_string()))); + + stats.value_sets.insert( + "c".to_string(), + vec![Some(set_true), Some(set_false), Some(set_unknown)], + ); + + let ctx = Arc::new(PruningContext::new(Arc::new(stats))); + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Utf8, true), + ]); + + let mult_expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Multiply, + lit(7), + )); + let a_gt = Arc::new(BinaryExpr::new(mult_expr, Operator::Gt, lit(10))); + let b_is_null = is_null(Arc::new(Column::new("b", 1))).unwrap(); + + let upper_udf = string::upper(); + let upper_expr = ScalarFunctionExpr::try_new( + upper_udf, + vec![Arc::new(Column::new("c", 2)) as Arc], + &schema, + Arc::new(ConfigOptions::new()), + ) + .unwrap(); + let c_in_list = in_list( + Arc::new(upper_expr), + vec![lit("ELECTRONIC"), lit("BOOK")], + &false, + &schema, + ) + .unwrap(); + + let left_or = + Arc::new(BinaryExpr::new(a_gt, Operator::Or, Arc::clone(&b_is_null))); + let predicate = BinaryExpr::new(left_or, Operator::Or, c_in_list); + + match predicate.evaluate_pruning(ctx).unwrap() { + PruningIntermediate::IntermediateResult(results) => { + assert_eq!( + results, + vec![ + PruningResult::KeepAll, + PruningResult::KeepAll, + PruningResult::Unknown + ] + ); + } + other => panic!("unexpected pruning result: {other:?}"), + } + } +} diff --git a/datafusion/physical-expr/tests/pruning_utils.rs b/datafusion/physical-expr/tests/pruning_utils.rs new file mode 100644 index 0000000000000..bf61daceb97c7 --- /dev/null +++ b/datafusion/physical-expr/tests/pruning_utils.rs @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use arrow::array::{ArrayRef, BooleanArray}; +use datafusion_common::pruning::PruningStatistics; +use datafusion_common::{Column, ScalarValue}; + +pub struct DummyStats; + +impl PruningStatistics for DummyStats { + fn min_values(&self, _: &Column) -> Option { + None + } + fn max_values(&self, _: &Column) -> Option { + None + } + fn num_containers(&self) -> usize { + 0 + } + fn null_counts(&self, _: &Column) -> Option { + None + } + fn row_counts(&self, _: &Column) -> Option { + None + } + fn contained(&self, _: &Column, _: &HashSet) -> Option { + None + } +} + +/// Simple `PruningStatistics` implementation backed by literal arrays. +/// Useful for testing expression pruning semantics without plumbing +/// actual file/row group metadata. +pub struct MockPruningStatistics { + column: String, + min_values: Option, + max_values: Option, + null_counts: Option, + row_counts: Option, + num_containers: usize, + value_sets: Option>>>, +} + +impl MockPruningStatistics { + pub fn new( + column: impl Into, + min_values: Option, + max_values: Option, + null_counts: Option, + row_counts: Option, + ) -> Self { + Self::new_with_sets( + column, + min_values, + max_values, + null_counts, + row_counts, + None, + ) + } + + pub fn new_with_sets( + column: impl Into, + min_values: Option, + max_values: Option, + null_counts: Option, + row_counts: Option, + value_sets: Option>>>, + ) -> Self { + let num_containers = min_values + .as_ref() + .or(max_values.as_ref()) + .or(null_counts.as_ref()) + .or(row_counts.as_ref()) + .map(|a| a.len()) + .unwrap_or(0); + if let Some(value_sets) = value_sets.as_ref() { + assert_eq!( + value_sets.len(), + num_containers, + "value sets must match container count" + ); + } + Self { + column: column.into(), + min_values, + max_values, + null_counts, + row_counts, + num_containers, + value_sets, + } + } + + /// Convenience constructor for uniform literal stats. + pub fn from_scalar( + column: impl Into, + value: ScalarValue, + num_containers: usize, + rows_per_container: u64, + ) -> Self { + let mins = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(num_containers), + ) + .expect("scalar to array"); + let maxs = ScalarValue::iter_to_array( + std::iter::repeat(value.clone()).take(num_containers), + ) + .expect("scalar to array"); + let null_counts = ScalarValue::iter_to_array( + std::iter::repeat(ScalarValue::UInt64(Some(0))).take(num_containers), + ) + .expect("zeros"); + let row_counts = ScalarValue::iter_to_array( + std::iter::repeat(ScalarValue::UInt64(Some(rows_per_container))) + .take(num_containers), + ) + .expect("rows"); + + Self::new( + column, + Some(mins), + Some(maxs), + Some(null_counts), + Some(row_counts), + ) + } +} + +impl PruningStatistics for MockPruningStatistics { + fn min_values(&self, column: &Column) -> Option { + (column.name == self.column) + .then(|| self.min_values.as_ref().map(Arc::clone)) + .flatten() + } + + fn max_values(&self, column: &Column) -> Option { + (column.name == self.column) + .then(|| self.max_values.as_ref().map(Arc::clone)) + .flatten() + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, column: &Column) -> Option { + (column.name == self.column) + .then(|| self.null_counts.as_ref().map(Arc::clone)) + .flatten() + } + + fn row_counts(&self, column: &Column) -> Option { + (column.name == self.column) + .then(|| self.row_counts.as_ref().map(Arc::clone)) + .flatten() + } + + fn value_sets(&self, column: &Column) -> Option>>> { + if column.name == self.column { + self.value_sets.clone() + } else { + None + } + } + + fn contained(&self, _: &Column, _: &HashSet) -> Option { + None + } +} + +#[derive(Clone)] +pub struct MultiColumnPruningStatistics { + pub mins: HashMap, + pub maxs: HashMap, + pub null_counts: HashMap, + pub row_counts: HashMap, + pub value_sets: HashMap>>>, + pub num_containers: usize, +} + +impl MultiColumnPruningStatistics { + pub fn new(num_containers: usize) -> Self { + Self { + mins: HashMap::new(), + maxs: HashMap::new(), + null_counts: HashMap::new(), + row_counts: HashMap::new(), + value_sets: HashMap::new(), + num_containers, + } + } +} + +impl PruningStatistics for MultiColumnPruningStatistics { + fn min_values(&self, column: &Column) -> Option { + self.mins.get(&column.name).cloned() + } + + fn max_values(&self, column: &Column) -> Option { + self.maxs.get(&column.name).cloned() + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, column: &Column) -> Option { + self.null_counts.get(&column.name).cloned() + } + + fn row_counts(&self, column: &Column) -> Option { + self.row_counts.get(&column.name).cloned() + } + + fn value_sets(&self, column: &Column) -> Option>>> { + self.value_sets.get(&column.name).cloned() + } + + fn contained(&self, _: &Column, _: &HashSet) -> Option { + None + } +} diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index 9f8142447ba69..318645d4e8283 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -19,6 +19,7 @@ #![deny(clippy::allow_attributes)] mod file_pruner; +pub mod pruner; mod pruning_predicate; pub use file_pruner::FilePruner; diff --git a/datafusion/pruning/src/pruner.rs b/datafusion/pruning/src/pruner.rs new file mode 100644 index 0000000000000..cc11c165a0f61 --- /dev/null +++ b/datafusion/pruning/src/pruner.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::ArrayRef;