diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 82cca91d4e8ae..09d8566c4a19e 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -24,13 +24,17 @@ use crate::query::to_order_by_exprs_with_select; use crate::utils::{ CheckColumnsMustReferenceAggregatePurpose, CheckColumnsSatisfyExprsPurpose, check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs, - resolve_columns, resolve_positions_to_exprs, rewrite_recursive_unnests_bottom_up, + resolve_columns, resolve_positions_to_exprs, rewrite_recursive_unnest_bottom_up, + rewrite_recursive_unnests_bottom_up, }; +use arrow::datatypes::DataType; use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_err}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; +use datafusion_expr::ExprSchemable; +use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts, @@ -463,15 +467,30 @@ impl SqlToRel<'_, S> { // expr returned here maybe different from the originals in inner_projection_exprs // for example: - // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 - // - unnest(array_col) will be transformed into unnest(array_col).element - // - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1 - let outer_projection_exprs = rewrite_recursive_unnests_bottom_up( - &intermediate_plan, - &mut unnest_columns, - &mut inner_projection_exprs, - &intermediate_select_exprs, - )?; + // - unnest(struct_col) will be transformed into struct_col.field1, struct_col.field2 + // - unnest(array_col) will be transformed into array_col.element + // - unnest(array_col) + 1 will be transformed into array_col.element +1 + let mut outer_projection_exprs = vec![]; + for expr in &intermediate_select_exprs { + let mut rewritten_exprs = rewrite_recursive_unnest_bottom_up( + &intermediate_plan, + &mut unnest_columns, + &mut inner_projection_exprs, + expr, + )?; + + if let Some(columns) = + self.get_struct_unnest_columns(&intermediate_plan, expr)? + { + rewritten_exprs = rewritten_exprs + .into_iter() + .zip(columns) + .map(|(expr, column)| expr.alias(column.flat_name())) + .collect(); + } + + outer_projection_exprs.extend(rewritten_exprs); + } // No more unnest is possible if unnest_columns.is_empty() { @@ -516,6 +535,35 @@ impl SqlToRel<'_, S> { .build() } + fn get_struct_unnest_columns( + &self, + input: &LogicalPlan, + expr: &Expr, + ) -> Result>> { + let unnest_expr = match expr { + Expr::Unnest(unnest_expr) => Some(unnest_expr), + Expr::Alias(alias) => match alias.expr.as_ref() { + Expr::Unnest(unnest_expr) => Some(unnest_expr), + _ => None, + }, + _ => None, + }; + + let Some(unnest_expr) = unnest_expr else { + return Ok(None); + }; + + let field = unnest_expr.expr.to_field(input.schema())?.1; + let DataType::Struct(inner_fields) = field.data_type() else { + return Ok(None); + }; + + Ok(Some(get_struct_unnested_columns( + &unnest_expr.expr.schema_name().to_string(), + inner_fields, + ))) + } + fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result { match input { // Fast path if there are no unnest in group by diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 06680e60714b8..1da63431e0246 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -987,7 +987,7 @@ fn test_unnest_logical_plan() -> Result<()> { assert_snapshot!( plan, @r" - Projection: __unnest_placeholder(unnest_table.struct_col).field1, __unnest_placeholder(unnest_table.struct_col).field2, __unnest_placeholder(unnest_table.array_col,depth=1) AS UNNEST(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col + Projection: __unnest_placeholder(unnest_table.struct_col).field1 AS unnest_table.struct_col.field1, __unnest_placeholder(unnest_table.struct_col).field2 AS unnest_table.struct_col.field2, __unnest_placeholder(unnest_table.array_col,depth=1) AS UNNEST(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col Unnest: lists[__unnest_placeholder(unnest_table.array_col)|depth=1] structs[__unnest_placeholder(unnest_table.struct_col)] Projection: unnest_table.struct_col AS __unnest_placeholder(unnest_table.struct_col), unnest_table.array_col AS __unnest_placeholder(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col TableScan: unnest_table diff --git a/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt b/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt index 704ad326cc230..1921fbfb4316c 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt @@ -130,19 +130,20 @@ statement ok CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3, 'b', 4)), (named_struct('a', 5, 'b', 6)); query II -select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5; +select * from (select unnest(column1) from d) where "d.column1.b" > 5; ---- 5 6 query TT -explain select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5; +explain select * from (select unnest(column1) from d) where "d.column1.b" > 5; ---- physical_plan -01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5 -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----UnnestExec -04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)] -05)--------DataSourceExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[__unnest_placeholder(d.column1).a@0 as d.column1.a, __unnest_placeholder(d.column1).b@1 as d.column1.b] +02)--FilterExec: __unnest_placeholder(d.column1).b@1 > 5 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------UnnestExec +05)--------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] statement ok drop table d; diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 8cfc01380d4b2..ac01b7d1f7c6c 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -68,9 +68,9 @@ select unnest(struct(1,2,3)) as ignored_alias; query TTT describe select unnest(struct(1,2,3)) as ignored_alias; ---- -__unnest_placeholder(struct(Int64(1),Int64(2),Int64(3))).c0 Int64 YES -__unnest_placeholder(struct(Int64(1),Int64(2),Int64(3))).c1 Int64 YES -__unnest_placeholder(struct(Int64(1),Int64(2),Int64(3))).c2 Int64 YES +struct(Int64(1),Int64(2),Int64(3)).c0 Int64 YES +struct(Int64(1),Int64(2),Int64(3)).c1 Int64 YES +struct(Int64(1),Int64(2),Int64(3)).c2 Int64 YES ## Basic unnest list expression in from clause query I @@ -608,18 +608,20 @@ query TT explain select unnest(unnest(column3)), column3 from recursive_unnest_table; ---- logical_plan -01)Unnest: lists[] structs[__unnest_placeholder(UNNEST(recursive_unnest_table.column3))] -02)--Projection: __unnest_placeholder(recursive_unnest_table.column3,depth=1) AS UNNEST(recursive_unnest_table.column3) AS __unnest_placeholder(UNNEST(recursive_unnest_table.column3)), recursive_unnest_table.column3 -03)----Unnest: lists[__unnest_placeholder(recursive_unnest_table.column3)|depth=1] structs[] -04)------Projection: recursive_unnest_table.column3 AS __unnest_placeholder(recursive_unnest_table.column3), recursive_unnest_table.column3 -05)--------TableScan: recursive_unnest_table projection=[column3] +01)Projection: __unnest_placeholder(UNNEST(recursive_unnest_table.column3)).c0 AS UNNEST(recursive_unnest_table.column3).c0, __unnest_placeholder(UNNEST(recursive_unnest_table.column3)).c1 AS UNNEST(recursive_unnest_table.column3).c1, recursive_unnest_table.column3 +02)--Unnest: lists[] structs[__unnest_placeholder(UNNEST(recursive_unnest_table.column3))] +03)----Projection: __unnest_placeholder(recursive_unnest_table.column3,depth=1) AS UNNEST(recursive_unnest_table.column3) AS __unnest_placeholder(UNNEST(recursive_unnest_table.column3)), recursive_unnest_table.column3 +04)------Unnest: lists[__unnest_placeholder(recursive_unnest_table.column3)|depth=1] structs[] +05)--------Projection: recursive_unnest_table.column3 AS __unnest_placeholder(recursive_unnest_table.column3), recursive_unnest_table.column3 +06)----------TableScan: recursive_unnest_table projection=[column3] physical_plan -01)UnnestExec -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----ProjectionExec: expr=[__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0 as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)), column3@1 as column3] -04)------UnnestExec -05)--------ProjectionExec: expr=[column3@0 as __unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3] -06)----------DataSourceExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[__unnest_placeholder(UNNEST(recursive_unnest_table.column3)).c0@0 as UNNEST(recursive_unnest_table.column3).c0, __unnest_placeholder(UNNEST(recursive_unnest_table.column3)).c1@1 as UNNEST(recursive_unnest_table.column3).c1, column3@2 as column3] +02)--UnnestExec +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------ProjectionExec: expr=[__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0 as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)), column3@1 as column3] +05)--------UnnestExec +06)----------ProjectionExec: expr=[column3@0 as __unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3] +07)------------DataSourceExec: partitions=1, partition_sizes=[1] ## unnest->field_access->unnest->unnest query I? @@ -824,9 +826,9 @@ d e {c0: f} query TTT describe select unnest(column1) c1 from nested_unnest_table; ---- -__unnest_placeholder(nested_unnest_table.column1).c0 Utf8 YES -__unnest_placeholder(nested_unnest_table.column1).c1 Utf8 YES -__unnest_placeholder(nested_unnest_table.column1).c2 Struct("c0": Utf8) YES +nested_unnest_table.column1.c0 Utf8 YES +nested_unnest_table.column1.c1 Utf8 YES +nested_unnest_table.column1.c2 Struct("c0": Utf8) YES query II??I?? select unnest(column5), * from unnest_table; @@ -1098,15 +1100,17 @@ EXPLAIN WITH unnested AS ( ) SELECT * FROM unnested order by 1; ---- logical_plan -01)Sort: unnested.__unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0 ASC NULLS LAST +01)Sort: unnested.struct(t.column1,t.column2,t.column3).c0 ASC NULLS LAST 02)--SubqueryAlias: unnested -03)----Unnest: lists[] structs[__unnest_placeholder(struct(t.column1,t.column2,t.column3))] -04)------Projection: struct(t.column1, t.column2, t.column3) AS __unnest_placeholder(struct(t.column1,t.column2,t.column3)) -05)--------TableScan: t projection=[column1, column2, column3] +03)----Projection: __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0 AS struct(t.column1,t.column2,t.column3).c0, __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c1 AS struct(t.column1,t.column2,t.column3).c1, __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c2 AS struct(t.column1,t.column2,t.column3).c2 +04)------Unnest: lists[] structs[__unnest_placeholder(struct(t.column1,t.column2,t.column3))] +05)--------Projection: struct(t.column1, t.column2, t.column3) AS __unnest_placeholder(struct(t.column1,t.column2,t.column3)) +06)----------TableScan: t projection=[column1, column2, column3] physical_plan -01)SortExec: expr=[__unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--UnnestExec -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_tuples.parquet]]}, projection=[struct(column1@0, column2@1, column3@2) as __unnest_placeholder(struct(t.column1,t.column2,t.column3))], file_type=parquet +01)SortExec: expr=[struct(t.column1,t.column2,t.column3).c0@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[__unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0@0 as struct(t.column1,t.column2,t.column3).c0, __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c1@1 as struct(t.column1,t.column2,t.column3).c1, __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c2@2 as struct(t.column1,t.column2,t.column3).c2] +03)----UnnestExec +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_tuples.parquet]]}, projection=[struct(column1@0, column2@1, column3@2) as __unnest_placeholder(struct(t.column1,t.column2,t.column3))], file_type=parquet # cleanup statement ok @@ -1152,12 +1156,14 @@ EXPLAIN SELECT UNNEST(column1), column2 FROM t ORDER BY column2; ---- logical_plan 01)Sort: t.column2 ASC NULLS LAST -02)--Unnest: lists[] structs[__unnest_placeholder(t.column1)] -03)----Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 -04)------TableScan: t projection=[column1, column2] +02)--Projection: __unnest_placeholder(t.column1).s1 AS t.column1.s1, __unnest_placeholder(t.column1).s2 AS t.column1.s2, __unnest_placeholder(t.column1).s3 AS t.column1.s3, t.column2 +03)----Unnest: lists[] structs[__unnest_placeholder(t.column1)] +04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 +05)--------TableScan: t projection=[column1, column2] physical_plan -01)UnnestExec -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct.parquet]]}, projection=[column1@0 as __unnest_placeholder(t.column1), column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet +01)ProjectionExec: expr=[__unnest_placeholder(t.column1).s1@0 as t.column1.s1, __unnest_placeholder(t.column1).s2@1 as t.column1.s2, __unnest_placeholder(t.column1).s3@2 as t.column1.s3, column2@3 as column2] +02)--UnnestExec +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct.parquet]]}, projection=[column1@0 as __unnest_placeholder(t.column1), column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet # cleanup statement ok @@ -1220,12 +1226,12 @@ EXPLAIN SELECT UNNEST(UNNEST(column1)), UNNEST(column2), UNNEST(column3), column ---- logical_plan 01)Sort: t.column4 ASC NULLS LAST -02)--Projection: __unnest_placeholder(t.column1,depth=2) AS UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1) AS UNNEST(t.column2), __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3, t.column4 +02)--Projection: __unnest_placeholder(t.column1,depth=2) AS UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1) AS UNNEST(t.column2), __unnest_placeholder(t.column3).s1 AS t.column3.s1, __unnest_placeholder(t.column3).s2 AS t.column3.s2, __unnest_placeholder(t.column3).s3 AS t.column3.s3, t.column4 03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=2, __unnest_placeholder(t.column2)|depth=1] structs[__unnest_placeholder(t.column3)] 04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 AS __unnest_placeholder(t.column2), t.column3 AS __unnest_placeholder(t.column3), t.column4 05)--------TableScan: t projection=[column1, column2, column3, column4] physical_plan -01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=2)@0 as UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1)@1 as UNNEST(t.column2), __unnest_placeholder(t.column3).s1@2 as __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2@3 as __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3@4 as __unnest_placeholder(t.column3).s3, column4@5 as column4] +01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=2)@0 as UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1)@1 as UNNEST(t.column2), __unnest_placeholder(t.column3).s1@2 as t.column3.s1, __unnest_placeholder(t.column3).s2@3 as t.column3.s2, __unnest_placeholder(t.column3).s3@4 as t.column3.s3, column4@5 as column4] 02)--UnnestExec 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct_arrays.parquet]]}, projection=[column1@0 as __unnest_placeholder(t.column1), column2@1 as __unnest_placeholder(t.column2), column3@2 as __unnest_placeholder(t.column3), column4], output_ordering=[column4@3 ASC NULLS LAST], file_type=parquet diff --git a/docs/source/user-guide/sql/special_functions.md b/docs/source/user-guide/sql/special_functions.md index 4f2a39f642b06..b7f5d3cab659c 100644 --- a/docs/source/user-guide/sql/special_functions.md +++ b/docs/source/user-guide/sql/special_functions.md @@ -69,7 +69,7 @@ Expands an array or map into rows. ### `unnest (struct)` Expand a struct fields into individual columns. -Each field of the struct will be prefixed with `__unnest_placeholder` and could be accessed via `"__unnest_placeholder()."`. +Each field of the struct could be accessed via `".."`. #### Arguments @@ -93,7 +93,7 @@ Each field of the struct will be prefixed with `__unnest_placeholder` and could > select unnest(struct_column) from foov; +--------------------------------------------+--------------------------------------------+ -| __unnest_placeholder(foov.struct_column).a | __unnest_placeholder(foov.struct_column).b | +| foov.struct_column.a | foov.struct_column.b | +--------------------------------------------+--------------------------------------------+ | 5 | a string | | 6 | another string |