diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 93df300bb50b4..8b123dd3ef79a 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -132,6 +132,8 @@ fn optimize_projections( config: &dyn OptimizerConfig, indices: RequiredIndices, ) -> Result> { + let volatile_in_plan = plan.expressions().iter().any(Expr::is_volatile); + // Recursively rewrite any nodes that may be able to avoid computation given // their parents' required indices. match plan { @@ -141,6 +143,7 @@ fn optimize_projections( }); } LogicalPlan::Aggregate(aggregate) => { + let has_volatile_ancestor = indices.has_volatile_ancestor(); // Split parent requirements to GROUP BY and aggregate sections: let n_group_exprs = aggregate.group_expr_len()?; // Offset aggregate indices so that they point to valid indices at @@ -188,6 +191,14 @@ fn optimize_projections( let necessary_indices = RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); + let mut necessary_indices = if new_aggr_expr.is_empty() { + necessary_indices.for_multiplicity_insensitive_child() + } else { + necessary_indices.for_multiplicity_sensitive_child() + }; + necessary_indices = necessary_indices + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan); return optimize_projections( Arc::unwrap_or_clone(aggregate.input), @@ -213,6 +224,7 @@ fn optimize_projections( }); } LogicalPlan::Window(window) => { + let has_volatile_ancestor = indices.has_volatile_ancestor(); let input_schema = Arc::clone(window.input.schema()); // Split parent requirements to child and window expression sections: let n_input_fields = input_schema.fields().len(); @@ -227,6 +239,14 @@ fn optimize_projections( // Get all the required column indices at the input, either by the // parent or window expression requirements. let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); + let mut required_indices = if new_window_expr.is_empty() { + required_indices.for_multiplicity_insensitive_child() + } else { + required_indices.for_multiplicity_sensitive_child() + }; + required_indices = required_indices + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan); return optimize_projections( Arc::unwrap_or_clone(window.input), @@ -293,10 +313,11 @@ fn optimize_projections( plan.inputs() .into_iter() .map(|input| { - indices + let required = indices .clone() .with_projection_beneficial() - .with_plan_exprs(&plan, input.schema()) + .with_plan_exprs(&plan, input.schema())?; + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>()? } @@ -307,7 +328,11 @@ fn optimize_projections( // flag is `false`. plan.inputs() .into_iter() - .map(|input| indices.clone().with_plan_exprs(&plan, input.schema())) + .map(|input| { + let required = + indices.clone().with_plan_exprs(&plan, input.schema())?; + Ok(required.with_plan_volatile(volatile_in_plan)) + }) .collect::>()? } LogicalPlan::Copy(_) @@ -316,8 +341,7 @@ fn optimize_projections( | LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::Subquery(_) - | LogicalPlan::Statement(_) - | LogicalPlan::Distinct(Distinct::All(_)) => { + | LogicalPlan::Statement(_) => { // These plans require all their fields, and their children should // be treated as final plans -- otherwise, we may have schema a // mismatch. @@ -325,8 +349,11 @@ fn optimize_projections( // EXISTS expression), we may not need to require all indices. plan.inputs() .into_iter() - .map(RequiredIndices::new_for_all_exprs) - .collect() + .map(|input| { + let required = RequiredIndices::new_for_all_exprs(input); + Ok(required.with_plan_volatile(volatile_in_plan)) + }) + .collect::>()? } LogicalPlan::Extension(extension) => { let Some(necessary_children_indices) = @@ -348,8 +375,9 @@ fn optimize_projections( .into_iter() .zip(necessary_children_indices) .map(|(child, necessary_indices)| { - RequiredIndices::new_from_indices(necessary_indices) - .with_plan_exprs(&plan, child.schema()) + let required = RequiredIndices::new_from_indices(necessary_indices) + .with_plan_exprs(&plan, child.schema())?; + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>>()? } @@ -376,10 +404,11 @@ fn optimize_projections( plan.inputs() .into_iter() .map(|input| { - indices + let required = indices .clone() .with_projection_beneficial() - .with_plan_exprs(&plan, input.schema()) + .with_plan_exprs(&plan, input.schema())?; + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>>()? } @@ -391,6 +420,12 @@ fn optimize_projections( left_req_indices.with_plan_exprs(&plan, join.left.schema())?; let right_indices = right_req_indices.with_plan_exprs(&plan, join.right.schema())?; + let left_indices = left_indices + .for_multiplicity_sensitive_child() + .with_plan_volatile(volatile_in_plan); + let right_indices = right_indices + .for_multiplicity_sensitive_child() + .with_plan_volatile(volatile_in_plan); // Joins benefit from "small" input tables (lower memory usage). // Therefore, each child benefits from projection: vec![ @@ -398,6 +433,15 @@ fn optimize_projections( right_indices.with_projection_beneficial(), ] } + LogicalPlan::Distinct(Distinct::All(_)) => plan + .inputs() + .into_iter() + .map(|input| { + let required = RequiredIndices::new_for_all_exprs(input) + .for_multiplicity_insensitive_child(); + Ok(required.with_plan_volatile(volatile_in_plan)) + }) + .collect::>()?, // these nodes are explicitly rewritten in the match statement above LogicalPlan::Projection(_) | LogicalPlan::Aggregate(_) @@ -407,19 +451,29 @@ fn optimize_projections( "OptimizeProjection: should have handled in the match statement above" ); } - LogicalPlan::Unnest(Unnest { - input, - dependency_indices, - .. - }) => { + LogicalPlan::Unnest(unnest) => { + if can_eliminate_unnest(unnest, &indices) { + let child_required_indices = + build_unnest_child_requirements(unnest, &indices); + let transformed_input = optimize_projections( + Arc::unwrap_or_clone(Arc::clone(&unnest.input)), + config, + child_required_indices, + )?; + return Ok(Transformed::yes(transformed_input.data)); + } // at least provide the indices for the exec-columns as a starting point - let required_indices = - RequiredIndices::new().with_plan_exprs(&plan, input.schema())?; + let mut required_indices = + RequiredIndices::new().with_plan_exprs(&plan, unnest.input.schema())?; + required_indices = required_indices + .for_multiplicity_sensitive_child() + .with_volatile_ancestor_if(indices.has_volatile_ancestor()) + .with_plan_volatile(volatile_in_plan); // Add additional required indices from the parent let mut additional_necessary_child_indices = Vec::new(); indices.indices().iter().for_each(|idx| { - if let Some(index) = dependency_indices.get(*idx) { + if let Some(index) = unnest.dependency_indices.get(*idx) { additional_necessary_child_indices.push(*index); } }); @@ -837,8 +891,14 @@ fn rewrite_projection_given_requirements( let exprs_used = indices.get_at_indices(&expr); - let required_indices = + let mut required_indices = RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter()); + if !indices.multiplicity_sensitive() { + required_indices = required_indices.for_multiplicity_insensitive_child(); + } + if indices.has_volatile_ancestor() { + required_indices = required_indices.with_volatile_ancestor(); + } // rewrite the children projection, and if they are changed rewrite the // projection down @@ -909,6 +969,62 @@ fn plan_contains_other_subqueries(plan: &LogicalPlan, cte_name: &str) -> bool { .any(|child| plan_contains_other_subqueries(child, cte_name)) } +fn can_eliminate_unnest(unnest: &Unnest, indices: &RequiredIndices) -> bool { + if indices.multiplicity_sensitive() || indices.has_volatile_ancestor() { + return false; + } + + // List unnest can drop rows for empty lists even when preserve_nulls=true. + // Without proving non-empty cardinality, keep UNNEST conservatively. + if !unnest.list_type_columns.is_empty() { + return false; + } + + // preserve_nulls only affects list unnest semantics. For struct-only unnest, + // row cardinality is unchanged and this option is not semantically relevant. + + indices + .indices() + .iter() + .all(|&output_idx| unnest_output_is_passthrough(unnest, output_idx)) +} + +fn unnest_output_is_passthrough(unnest: &Unnest, output_idx: usize) -> bool { + let Some(&dependency_idx) = unnest.dependency_indices.get(output_idx) else { + return false; + }; + + if dependency_idx >= unnest.input.schema().fields().len() { + return false; + } + + unnest.schema.qualified_field(output_idx) + == unnest.input.schema().qualified_field(dependency_idx) +} + +fn build_unnest_child_requirements( + unnest: &Unnest, + indices: &RequiredIndices, +) -> RequiredIndices { + let child_indices = indices + .indices() + .iter() + .filter_map(|&output_idx| unnest.dependency_indices.get(output_idx).copied()) + .collect::>(); + let mut child_required_indices = RequiredIndices::new_from_indices(child_indices); + if indices.projection_beneficial() { + child_required_indices = child_required_indices.with_projection_beneficial(); + } + if indices.has_volatile_ancestor() { + child_required_indices = child_required_indices.with_volatile_ancestor(); + } + if !indices.multiplicity_sensitive() { + child_required_indices = + child_required_indices.for_multiplicity_insensitive_child(); + } + child_required_indices +} + fn expr_contains_subquery(expr: &Expr) -> bool { expr.exists(|e| match e { Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => Ok(true), @@ -953,7 +1069,7 @@ mod tests { use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{ - Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, + Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, UnnestOptions, }; use datafusion_expr::ExprFunctionExt; use datafusion_expr::{ @@ -2274,6 +2390,103 @@ mod tests { ) } + #[test] + fn eliminate_struct_unnest_when_only_group_keys_are_required() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new( + "user", + DataType::Struct( + vec![ + Field::new("name", DataType::Utf8, true), + Field::new("score", DataType::Int32, true), + ] + .into(), + ), + true, + ), + ]); + let plan = scan_empty(Some("test"), &schema, None)? + .unnest_column("user")? + .aggregate(vec![col("id")], Vec::::new())? + .project(vec![col("id")])? + .build()?; + + let optimized = optimize(plan)?; + let formatted = format!("{}", optimized.display_indent()); + assert!(!formatted.contains("Unnest:")); + Ok(()) + } + + #[test] + fn keep_list_unnest_when_group_keys_are_only_required_outputs() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new( + "vals", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ]); + let plan = scan_empty(Some("test"), &schema, None)? + .unnest_column("vals")? + .aggregate(vec![col("id")], Vec::::new())? + .project(vec![col("id")])? + .build()?; + + let optimized = optimize(plan)?; + let formatted = format!("{}", optimized.display_indent()); + assert!(formatted.contains("Unnest:")); + Ok(()) + } + + #[test] + fn keep_unnest_when_count_depends_on_row_multiplicity() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new( + "vals", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ]); + let plan = scan_empty(Some("test"), &schema, None)? + .unnest_column("vals")? + .aggregate(vec![col("id")], vec![count(lit(1)).alias("cnt")])? + .project(vec![col("id"), col("cnt")])? + .build()?; + + let optimized = optimize(plan)?; + let formatted = format!("{}", optimized.display_indent()); + assert!(formatted.contains("Unnest:")); + Ok(()) + } + + #[test] + fn keep_unnest_when_preserve_nulls_is_disabled() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new( + "vals", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ]); + let plan = scan_empty(Some("test"), &schema, None)? + .unnest_column_with_options( + "vals", + UnnestOptions::new().with_preserve_nulls(false), + )? + .aggregate(vec![col("id")], Vec::::new())? + .project(vec![col("id")])? + .build()?; + + let optimized = optimize(plan)?; + let formatted = format!("{}", optimized.display_indent()); + assert!(formatted.contains("Unnest:")); + Ok(()) + } + #[test] fn test_window() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index c1e0885c9b5f2..7053266d0e658 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -34,13 +34,28 @@ use datafusion_expr::{Expr, LogicalPlan}; /// Indices are always in order and without duplicates. For example, if these /// indices were added `[3, 2, 4, 3, 6, 1]`, the instance would be represented /// by `[1, 2, 3, 4, 6]`. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub(super) struct RequiredIndices { /// The indices of the required columns in the indices: Vec, /// If putting a projection above children is beneficial for the parent. /// Defaults to false. projection_beneficial: bool, + /// Whether ancestors can observe row multiplicity changes. + multiplicity_sensitive: bool, + /// Whether any ancestor expression is volatile. + has_volatile_ancestor: bool, +} + +impl Default for RequiredIndices { + fn default() -> Self { + Self { + indices: Vec::new(), + projection_beneficial: false, + multiplicity_sensitive: true, + has_volatile_ancestor: false, + } + } } impl RequiredIndices { @@ -54,6 +69,8 @@ impl RequiredIndices { Self { indices: (0..plan.schema().fields().len()).collect(), projection_beneficial: false, + multiplicity_sensitive: true, + has_volatile_ancestor: false, } } @@ -62,6 +79,8 @@ impl RequiredIndices { Self { indices, projection_beneficial: false, + multiplicity_sensitive: true, + has_volatile_ancestor: false, } .compact() } @@ -77,6 +96,62 @@ impl RequiredIndices { self } + /// Mark this requirement as multiplicity-insensitive. + pub fn with_multiplicity_insensitive(mut self) -> Self { + self.multiplicity_sensitive = false; + self + } + + /// Mark this requirement as multiplicity-sensitive. + pub fn with_multiplicity_sensitive(mut self) -> Self { + self.multiplicity_sensitive = true; + self + } + + /// Return whether ancestors can observe multiplicity changes. + pub fn multiplicity_sensitive(&self) -> bool { + self.multiplicity_sensitive + } + + /// Mark this requirement as having volatile ancestors. + pub fn with_volatile_ancestor(mut self) -> Self { + self.has_volatile_ancestor = true; + self + } + + /// Conditionally mark this requirement as having volatile ancestors. + pub fn with_volatile_ancestor_if(mut self, value: bool) -> Self { + if value { + self.has_volatile_ancestor = true; + } + self + } + + /// Propagate volatile-plan context into this requirement. + /// + /// This keeps call sites declarative and centralizes state-transition logic. + pub fn with_plan_volatile(mut self, volatile_in_plan: bool) -> Self { + if volatile_in_plan { + self.has_volatile_ancestor = true; + } + self + } + + /// Transition this requirement for a multiplicity-sensitive child. + pub fn for_multiplicity_sensitive_child(self) -> Self { + self.with_multiplicity_sensitive() + } + + /// Transition this requirement for a multiplicity-insensitive child. + pub fn for_multiplicity_insensitive_child(self) -> Self { + self.with_multiplicity_insensitive() + } + + /// Return whether a volatile expression exists in the ancestor chain. + pub fn has_volatile_ancestor(&self) -> bool { + self.has_volatile_ancestor + } + /// Return the value of projection beneficial flag pub fn projection_beneficial(&self) -> bool { self.projection_beneficial @@ -173,10 +248,14 @@ impl RequiredIndices { Self { indices: l, projection_beneficial, + multiplicity_sensitive: self.multiplicity_sensitive, + has_volatile_ancestor: self.has_volatile_ancestor, }, Self { indices: r, projection_beneficial, + multiplicity_sensitive: self.multiplicity_sensitive, + has_volatile_ancestor: self.has_volatile_ancestor, }, ) } diff --git a/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt b/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt new file mode 100644 index 0000000000000..3a44090b2c090 --- /dev/null +++ b/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +############################### +# Unnest Pruning Safety Tests # +############################### + +statement ok +CREATE TABLE unnest_prune_t +AS VALUES + (1, [1, 2]), + (2, []), + (3, [3]), + (4, null) +; + +statement ok +set datafusion.explain.logical_plan_only = true; + +# Safe case: struct unnest is cardinality-preserving and unnested outputs are dead. +# Unnest should be removed. +statement ok +CREATE TABLE unnest_prune_struct_t +AS VALUES + (1, struct('a', 10)), + (2, struct('b', 20)) +; + +query TT +EXPLAIN SELECT id +FROM ( + SELECT column1 AS id, unnest(column2) + FROM unnest_prune_struct_t +) q +GROUP BY id; +---- +logical_plan +01)Aggregate: groupBy=[[q.id]], aggr=[[]] +02)--SubqueryAlias: q +03)----Projection: unnest_prune_struct_t.column1 AS id +04)------TableScan: unnest_prune_struct_t projection=[column1] + +# Empty-list/null semantics are cardinality-sensitive even if unnested column is dead. +# Unnest must remain. +query TT +EXPLAIN SELECT id +FROM ( + SELECT column1 AS id, unnest(column2) AS elem + FROM unnest_prune_t +) q +GROUP BY id; +---- +logical_plan +01)Aggregate: groupBy=[[q.id]], aggr=[[]] +02)--SubqueryAlias: q +03)----Projection: id +04)------Unnest: lists[__unnest_placeholder(unnest_prune_t.column2)|depth=1] structs[] +05)--------Projection: unnest_prune_t.column1 AS id, unnest_prune_t.column2 AS __unnest_placeholder(unnest_prune_t.column2) +06)----------TableScan: unnest_prune_t projection=[column1, column2] + +# Count(*) is explicitly multiplicity-sensitive. Unnest must remain. +query TT +EXPLAIN SELECT id, count(*) AS cnt +FROM ( + SELECT column1 AS id, unnest(column2) AS elem + FROM unnest_prune_t +) q +GROUP BY id; +---- +logical_plan +01)Projection: q.id, count(Int64(1)) AS count(*) AS cnt +02)--Aggregate: groupBy=[[q.id]], aggr=[[count(Int64(1))]] +03)----SubqueryAlias: q +04)------Projection: id +05)--------Unnest: lists[__unnest_placeholder(unnest_prune_t.column2)|depth=1] structs[] +06)----------Projection: unnest_prune_t.column1 AS id, unnest_prune_t.column2 AS __unnest_placeholder(unnest_prune_t.column2) +07)------------TableScan: unnest_prune_t projection=[column1, column2] + +statement ok +set datafusion.explain.logical_plan_only = false; + +# Correctness check for empty-list/null behavior +query I +SELECT id +FROM ( + SELECT column1 AS id, unnest(column2) AS elem + FROM unnest_prune_t +) q +GROUP BY id +ORDER BY id; +---- +1 +3 + +# Correctness check for multiplicity-sensitive count path +query II +SELECT id, count(*) AS cnt +FROM ( + SELECT column1 AS id, unnest(column2) AS elem + FROM unnest_prune_t +) q +GROUP BY id +ORDER BY id; +---- +1 2 +3 1