Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions datafusion/common/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use crate::{ColumnStatistics, ScalarValue};
///
/// 3. Whether the values in a column are contained in a set of literals
///
/// 4. Known value sets for columns, on a per-container basis
///
/// # Vectorized Interface
///
/// Information for containers / files are returned as Arrow [`ArrayRef`], so
Expand Down Expand Up @@ -105,6 +107,19 @@ pub trait PruningStatistics {
/// [`UInt64Array`]: arrow::array::UInt64Array
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Return the known set of values for the named column in each container.
///
/// The returned vector must contain [`Self::num_containers`] entries.
/// Each entry corresponds to a container and can be:
/// * `Some(HashSet<ScalarValue>)` if the values for that container are known
/// * `None` if the set is unknown for that container.
///
/// If set statistics are not available for any container, return `None`
/// (the default).
fn value_sets(&self, _column: &Column) -> Option<Vec<Option<HashSet<ScalarValue>>>> {
None
}

/// Returns [`BooleanArray`] where each row represents information known
/// about specific literal `values` in a column.
///
Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ impl ScalarUDFImpl for AsyncScalarUDF {
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
internal_err!("async functions should not be called directly")
}

fn propagate_set_stats(
&self,
child_set_stats: &[crate::SetStats],
) -> Result<Option<crate::SetStats>> {
self.inner.propagate_set_stats(child_set_stats)
}
}

impl Display for AsyncScalarUDF {
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub use datafusion_expr_common::signature::{
TIMEZONE_WILDCARD, TypeSignature, TypeSignatureClass, Volatility,
};
pub use datafusion_expr_common::type_coercion::binary;
pub use datafusion_physical_expr_common::physical_expr::SetStats;
pub use expr::{
Between, BinaryExpr, Case, Cast, Expr, GetFieldAccess, GroupingSet, Like,
Sort as SortExpr, TryCast, WindowFunctionDefinition,
Expand Down
28 changes: 27 additions & 1 deletion datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::expr::schema_name_from_exprs_comma_separated_without_space;
use crate::simplify::{ExprSimplifyResult, SimplifyInfo};
use crate::sort_properties::{ExprProperties, SortProperties};
use crate::udf_eq::UdfEq;
use crate::{ColumnarValue, Documentation, Expr, Signature};
use crate::{ColumnarValue, Documentation, Expr, SetStats, Signature};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
Expand Down Expand Up @@ -321,6 +321,14 @@ impl ScalarUDF {
self.inner.propagate_constraints(interval, inputs)
}

/// Propagate set statistics through this function.
pub fn propagate_set_stats(
&self,
child_set_stats: &[SetStats],
) -> Result<Option<SetStats>> {
self.inner.propagate_set_stats(child_set_stats)
}

/// Calculates the [`SortProperties`] of this function based on its
/// children's properties.
pub fn output_ordering(&self, inputs: &[ExprProperties]) -> Result<SortProperties> {
Expand Down Expand Up @@ -785,6 +793,17 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
Ok(Some(vec![]))
}

/// Propagate set statistics from children through this function.
///
/// Returns `Ok(None)` by default, indicating the function does not support
/// set-stat based pruning.
fn propagate_set_stats(
&self,
_child_set_stats: &[SetStats],
) -> Result<Option<SetStats>> {
Ok(None)
}

/// Calculates the [`SortProperties`] of this function based on its children's properties.
fn output_ordering(&self, inputs: &[ExprProperties]) -> Result<SortProperties> {
if !self.preserves_lex_ordering(inputs)? {
Expand Down Expand Up @@ -949,6 +968,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.propagate_constraints(interval, inputs)
}

fn propagate_set_stats(
&self,
child_set_stats: &[SetStats],
) -> Result<Option<SetStats>> {
self.inner.propagate_set_stats(child_set_stats)
}

fn output_ordering(&self, inputs: &[ExprProperties]) -> Result<SortProperties> {
self.inner.output_ordering(inputs)
}
Expand Down
51 changes: 48 additions & 3 deletions datafusion/functions/src/string/upper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
use crate::string::common::to_upper;
use crate::utils::utf8_to_str_type;
use arrow::datatypes::DataType;
use datafusion_common::Result;
use datafusion_common::types::logical_string;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
TypeSignatureClass, Volatility,
Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, SetStats,
Signature, TypeSignatureClass, Volatility,
};
use datafusion_macros::user_doc;
use std::any::Any;
use std::collections::HashSet;

#[user_doc(
doc_section(label = "String Functions"),
Expand Down Expand Up @@ -88,6 +89,50 @@ impl ScalarUDFImpl for UpperFunc {
to_upper(&args.args, "upper")
}

/// For input stat, capitalize the set statistics.
///
/// Example:
/// Input expression `c` has set stat `{'foo', 'bar'}`
/// This function `upper(c)` will propagate the set stat to `{'FOO', 'BAR'}`
fn propagate_set_stats(
&self,
child_set_stats: &[SetStats],
) -> Result<Option<SetStats>> {
if child_set_stats.len() != 1 {
return Ok(None);
}

let input_sets = &child_set_stats[0];
let mut upper_sets = Vec::with_capacity(input_sets.len());

for values in input_sets.value_sets() {
let Some(values) = values else {
upper_sets.push(None);
continue;
};

let mut upper_values = HashSet::with_capacity(values.len());
for value in values {
let upper_value = match value {
ScalarValue::Utf8(v) => {
ScalarValue::Utf8(v.as_ref().map(|s| s.to_uppercase()))
}
ScalarValue::LargeUtf8(v) => {
ScalarValue::LargeUtf8(v.as_ref().map(|s| s.to_uppercase()))
}
ScalarValue::Utf8View(v) => {
ScalarValue::Utf8View(v.as_ref().map(|s| s.to_uppercase()))
}
_ => return Ok(None),
};
upper_values.insert(upper_value);
}
upper_sets.push(Some(upper_values));
}

SetStats::new(upper_sets, input_sets.len()).map(Some)
}

fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
Expand Down
166 changes: 166 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ use datafusion_expr_common::statistics::Distribution;

use itertools::izip;

mod pruning;

pub use pruning::{
ColumnStats, NullPresence, NullStats, PruningContext, PruningIntermediate,
PruningResult, RangeStats, SetStats,
};

/// Shared [`PhysicalExpr`].
pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;

Expand Down Expand Up @@ -430,6 +437,165 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn is_volatile_node(&self) -> bool {
false
}

// ---------------
// Pruning related
// ---------------
// None means propagation not implemented/supported for this node
fn propagate_range_stats(
&self,
_child_range_stats: &[RangeStats],
) -> Result<Option<RangeStats>> {
Ok(None)
}

fn propagate_null_stats(
&self,
_child_range_stats: &[NullStats],
) -> Result<Option<NullStats>> {
Ok(None)
}

fn propagate_set_stats(
&self,
_child_set_stats: &[SetStats],
) -> Result<Option<SetStats>> {
Ok(None)
}

fn evaluate_pruning(&self, ctx: Arc<PruningContext>) -> Result<PruningIntermediate> {
// Default impl for stats-propagation nodes (e.g. arithmetic expressions):
// 1) Evaluate pruning for all children.
// 2) If every child produced range/null stats, propagate them.
// 3) If no stats can be propagated, fall back to `Unsupported`.
let children = self.children();
if children.is_empty() {
return Ok(PruningIntermediate::empty_stats());
}

let mut range_complete = true;
let mut null_complete = true;
let mut set_complete = true;
let mut child_range_stats = Vec::with_capacity(children.len());
let mut child_null_stats = Vec::with_capacity(children.len());
let mut child_set_stats = Vec::with_capacity(children.len());

for child in children {
match child.evaluate_pruning(Arc::clone(&ctx))? {
PruningIntermediate::IntermediateStats(stats) => {
match stats.range_stats {
Some(range_stats) if range_complete => {
child_range_stats.push(range_stats);
}
_ => {
range_complete = false;
}
}

match stats.null_stats {
Some(null_stats) if null_complete => {
child_null_stats.push(null_stats);
}
_ => {
null_complete = false;
}
}

match stats.set_stats {
Some(set_stats) if set_complete => {
child_set_stats.push(set_stats);
}
_ => {
set_complete = false;
}
}
}
// Without node-specific semantics, we can't combine a final pruning result here.
other => return Ok(other),
}
}

let range_stats = if range_complete && !child_range_stats.is_empty() {
if let Some((first, rest)) = child_range_stats.split_first() {
for stats in rest {
assert_eq_or_internal_err!(
first.len(),
stats.len(),
"Range stats length mismatch between pruning children"
);
}
}
self.propagate_range_stats(&child_range_stats)?
} else {
None
};

let null_stats = if null_complete && !child_null_stats.is_empty() {
if let Some((first, rest)) = child_null_stats.split_first() {
for stats in rest {
assert_eq_or_internal_err!(
first.len(),
stats.len(),
"Null stats length mismatch between pruning children"
);
}
}
self.propagate_null_stats(&child_null_stats)?
} else {
None
};

let set_stats = if set_complete && !child_set_stats.is_empty() {
if let Some((first, rest)) = child_set_stats.split_first() {
for stats in rest {
assert_eq_or_internal_err!(
first.len(),
stats.len(),
"Set stats length mismatch between pruning children"
);
}
}
self.propagate_set_stats(&child_set_stats)?
} else {
None
};

if let (Some(range_stats), Some(null_stats)) =
(range_stats.as_ref(), null_stats.as_ref())
{
assert_eq_or_internal_err!(
range_stats.len(),
null_stats.len(),
"Range and null stats length mismatch for pruning"
);
}

if let (Some(range_stats), Some(set_stats)) =
(range_stats.as_ref(), set_stats.as_ref())
{
assert_eq_or_internal_err!(
range_stats.len(),
set_stats.len(),
"Range and set stats length mismatch for pruning"
);
}

if let (Some(null_stats), Some(set_stats)) =
(null_stats.as_ref(), set_stats.as_ref())
{
assert_eq_or_internal_err!(
null_stats.len(),
set_stats.len(),
"Null and set stats length mismatch for pruning"
);
}

Ok(PruningIntermediate::IntermediateStats(ColumnStats {
range_stats,
null_stats,
set_stats,
}))
}
}

#[deprecated(
Expand Down
Loading
Loading