Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 58 additions & 10 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ use crate::query::to_order_by_exprs_with_select;
use crate::utils::{
CheckColumnsMustReferenceAggregatePurpose, CheckColumnsSatisfyExprsPurpose,
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs, rewrite_recursive_unnests_bottom_up,
resolve_columns, resolve_positions_to_exprs, rewrite_recursive_unnest_bottom_up,
rewrite_recursive_unnests_bottom_up,
};

use arrow::datatypes::DataType;
use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_err};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::ExprSchemable;
use datafusion_expr::builder::get_struct_unnested_columns;
use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts,
Expand Down Expand Up @@ -463,15 +467,30 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

// expr returned here maybe different from the originals in inner_projection_exprs
// for example:
// - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
// - unnest(array_col) will be transformed into unnest(array_col).element
// - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1
let outer_projection_exprs = rewrite_recursive_unnests_bottom_up(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
&intermediate_select_exprs,
)?;
// - unnest(struct_col) will be transformed into struct_col.field1, struct_col.field2
// - unnest(array_col) will be transformed into array_col.element
// - unnest(array_col) + 1 will be transformed into array_col.element +1
let mut outer_projection_exprs = vec![];
for expr in &intermediate_select_exprs {
let mut rewritten_exprs = rewrite_recursive_unnest_bottom_up(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)?;

if let Some(public_columns) =
self.get_struct_unnest_columns(&intermediate_plan, expr)?
{
rewritten_exprs = rewritten_exprs
.into_iter()
.zip(public_columns)
.map(|(expr, column)| expr.alias(column.flat_name()))
.collect();
}

outer_projection_exprs.extend(rewritten_exprs);
}

// No more unnest is possible
if unnest_columns.is_empty() {
Expand Down Expand Up @@ -516,6 +535,35 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.build()
}

fn get_struct_unnest_columns(
&self,
input: &LogicalPlan,
expr: &Expr,
) -> Result<Option<Vec<Column>>> {
let unnest_expr = match expr {
Expr::Unnest(unnest_expr) => Some(unnest_expr),
Expr::Alias(alias) => match alias.expr.as_ref() {
Expr::Unnest(unnest_expr) => Some(unnest_expr),
_ => None,
},
_ => None,
};

let Some(unnest_expr) = unnest_expr else {
return Ok(None);
};

let field = unnest_expr.expr.to_field(input.schema())?.1;
let DataType::Struct(inner_fields) = field.data_type() else {
return Ok(None);
};

Ok(Some(get_struct_unnested_columns(
&unnest_expr.expr.schema_name().to_string(),
inner_fields,
)))
}

fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> {
match input {
// Fast path if there are no unnest in group by
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ fn test_unnest_logical_plan() -> Result<()> {
assert_snapshot!(
plan,
@r"
Projection: __unnest_placeholder(unnest_table.struct_col).field1, __unnest_placeholder(unnest_table.struct_col).field2, __unnest_placeholder(unnest_table.array_col,depth=1) AS UNNEST(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col
Projection: __unnest_placeholder(unnest_table.struct_col).field1 AS unnest_table.struct_col.field1, __unnest_placeholder(unnest_table.struct_col).field2 AS unnest_table.struct_col.field2, __unnest_placeholder(unnest_table.array_col,depth=1) AS UNNEST(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col
Unnest: lists[__unnest_placeholder(unnest_table.array_col)|depth=1] structs[__unnest_placeholder(unnest_table.struct_col)]
Projection: unnest_table.struct_col AS __unnest_placeholder(unnest_table.struct_col), unnest_table.array_col AS __unnest_placeholder(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col
TableScan: unnest_table
Expand Down
15 changes: 8 additions & 7 deletions datafusion/sqllogictest/test_files/push_down_filter_unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,20 @@ statement ok
CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3, 'b', 4)), (named_struct('a', 5, 'b', 6));

query II
select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5;
select * from (select unnest(column1) from d) where "d.column1.b" > 5;
----
5 6

query TT
explain select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5;
explain select * from (select unnest(column1) from d) where "d.column1.b" > 5;
----
physical_plan
01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----UnnestExec
04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)]
05)--------DataSourceExec: partitions=1, partition_sizes=[1]
01)ProjectionExec: expr=[__unnest_placeholder(d.column1).a@0 as d.column1.a, __unnest_placeholder(d.column1).b@1 as d.column1.b]
02)--FilterExec: __unnest_placeholder(d.column1).b@1 > 5
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------UnnestExec
05)--------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
drop table d;
Expand Down
68 changes: 37 additions & 31 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ select unnest(struct(1,2,3)) as ignored_alias;
query TTT
describe select unnest(struct(1,2,3)) as ignored_alias;
----
__unnest_placeholder(struct(Int64(1),Int64(2),Int64(3))).c0 Int64 YES
__unnest_placeholder(struct(Int64(1),Int64(2),Int64(3))).c1 Int64 YES
__unnest_placeholder(struct(Int64(1),Int64(2),Int64(3))).c2 Int64 YES
struct(Int64(1),Int64(2),Int64(3)).c0 Int64 YES
struct(Int64(1),Int64(2),Int64(3)).c1 Int64 YES
struct(Int64(1),Int64(2),Int64(3)).c2 Int64 YES

## Basic unnest list expression in from clause
query I
Expand Down Expand Up @@ -608,18 +608,20 @@ query TT
explain select unnest(unnest(column3)), column3 from recursive_unnest_table;
----
logical_plan
01)Unnest: lists[] structs[__unnest_placeholder(UNNEST(recursive_unnest_table.column3))]
02)--Projection: __unnest_placeholder(recursive_unnest_table.column3,depth=1) AS UNNEST(recursive_unnest_table.column3) AS __unnest_placeholder(UNNEST(recursive_unnest_table.column3)), recursive_unnest_table.column3
03)----Unnest: lists[__unnest_placeholder(recursive_unnest_table.column3)|depth=1] structs[]
04)------Projection: recursive_unnest_table.column3 AS __unnest_placeholder(recursive_unnest_table.column3), recursive_unnest_table.column3
05)--------TableScan: recursive_unnest_table projection=[column3]
01)Projection: __unnest_placeholder(UNNEST(recursive_unnest_table.column3)).c0 AS UNNEST(recursive_unnest_table.column3).c0, __unnest_placeholder(UNNEST(recursive_unnest_table.column3)).c1 AS UNNEST(recursive_unnest_table.column3).c1, recursive_unnest_table.column3
02)--Unnest: lists[] structs[__unnest_placeholder(UNNEST(recursive_unnest_table.column3))]
03)----Projection: __unnest_placeholder(recursive_unnest_table.column3,depth=1) AS UNNEST(recursive_unnest_table.column3) AS __unnest_placeholder(UNNEST(recursive_unnest_table.column3)), recursive_unnest_table.column3
04)------Unnest: lists[__unnest_placeholder(recursive_unnest_table.column3)|depth=1] structs[]
05)--------Projection: recursive_unnest_table.column3 AS __unnest_placeholder(recursive_unnest_table.column3), recursive_unnest_table.column3
06)----------TableScan: recursive_unnest_table projection=[column3]
physical_plan
01)UnnestExec
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----ProjectionExec: expr=[__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0 as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)), column3@1 as column3]
04)------UnnestExec
05)--------ProjectionExec: expr=[column3@0 as __unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
01)ProjectionExec: expr=[__unnest_placeholder(UNNEST(recursive_unnest_table.column3)).c0@0 as UNNEST(recursive_unnest_table.column3).c0, __unnest_placeholder(UNNEST(recursive_unnest_table.column3)).c1@1 as UNNEST(recursive_unnest_table.column3).c1, column3@2 as column3]
02)--UnnestExec
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------ProjectionExec: expr=[__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0 as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)), column3@1 as column3]
05)--------UnnestExec
06)----------ProjectionExec: expr=[column3@0 as __unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]

## unnest->field_access->unnest->unnest
query I?
Expand Down Expand Up @@ -824,9 +826,9 @@ d e {c0: f}
query TTT
describe select unnest(column1) c1 from nested_unnest_table;
----
__unnest_placeholder(nested_unnest_table.column1).c0 Utf8 YES
__unnest_placeholder(nested_unnest_table.column1).c1 Utf8 YES
__unnest_placeholder(nested_unnest_table.column1).c2 Struct("c0": Utf8) YES
nested_unnest_table.column1.c0 Utf8 YES
nested_unnest_table.column1.c1 Utf8 YES
nested_unnest_table.column1.c2 Struct("c0": Utf8) YES

query II??I??
select unnest(column5), * from unnest_table;
Expand Down Expand Up @@ -1098,15 +1100,17 @@ EXPLAIN WITH unnested AS (
) SELECT * FROM unnested order by 1;
----
logical_plan
01)Sort: unnested.__unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0 ASC NULLS LAST
01)Sort: unnested.struct(t.column1,t.column2,t.column3).c0 ASC NULLS LAST
02)--SubqueryAlias: unnested
03)----Unnest: lists[] structs[__unnest_placeholder(struct(t.column1,t.column2,t.column3))]
04)------Projection: struct(t.column1, t.column2, t.column3) AS __unnest_placeholder(struct(t.column1,t.column2,t.column3))
05)--------TableScan: t projection=[column1, column2, column3]
03)----Projection: __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0 AS struct(t.column1,t.column2,t.column3).c0, __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c1 AS struct(t.column1,t.column2,t.column3).c1, __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c2 AS struct(t.column1,t.column2,t.column3).c2
04)------Unnest: lists[] structs[__unnest_placeholder(struct(t.column1,t.column2,t.column3))]
05)--------Projection: struct(t.column1, t.column2, t.column3) AS __unnest_placeholder(struct(t.column1,t.column2,t.column3))
06)----------TableScan: t projection=[column1, column2, column3]
physical_plan
01)SortExec: expr=[__unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--UnnestExec
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_tuples.parquet]]}, projection=[struct(column1@0, column2@1, column3@2) as __unnest_placeholder(struct(t.column1,t.column2,t.column3))], file_type=parquet
01)SortExec: expr=[struct(t.column1,t.column2,t.column3).c0@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--ProjectionExec: expr=[__unnest_placeholder(struct(t.column1,t.column2,t.column3)).c0@0 as struct(t.column1,t.column2,t.column3).c0, __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c1@1 as struct(t.column1,t.column2,t.column3).c1, __unnest_placeholder(struct(t.column1,t.column2,t.column3)).c2@2 as struct(t.column1,t.column2,t.column3).c2]
03)----UnnestExec
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_tuples.parquet]]}, projection=[struct(column1@0, column2@1, column3@2) as __unnest_placeholder(struct(t.column1,t.column2,t.column3))], file_type=parquet

# cleanup
statement ok
Expand Down Expand Up @@ -1152,12 +1156,14 @@ EXPLAIN SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
----
logical_plan
01)Sort: t.column2 ASC NULLS LAST
02)--Unnest: lists[] structs[__unnest_placeholder(t.column1)]
03)----Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2
04)------TableScan: t projection=[column1, column2]
02)--Projection: __unnest_placeholder(t.column1).s1 AS t.column1.s1, __unnest_placeholder(t.column1).s2 AS t.column1.s2, __unnest_placeholder(t.column1).s3 AS t.column1.s3, t.column2
03)----Unnest: lists[] structs[__unnest_placeholder(t.column1)]
04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2
05)--------TableScan: t projection=[column1, column2]
physical_plan
01)UnnestExec
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct.parquet]]}, projection=[column1@0 as __unnest_placeholder(t.column1), column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet
01)ProjectionExec: expr=[__unnest_placeholder(t.column1).s1@0 as t.column1.s1, __unnest_placeholder(t.column1).s2@1 as t.column1.s2, __unnest_placeholder(t.column1).s3@2 as t.column1.s3, column2@3 as column2]
02)--UnnestExec
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct.parquet]]}, projection=[column1@0 as __unnest_placeholder(t.column1), column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet

# cleanup
statement ok
Expand Down Expand Up @@ -1220,12 +1226,12 @@ EXPLAIN SELECT UNNEST(UNNEST(column1)), UNNEST(column2), UNNEST(column3), column
----
logical_plan
01)Sort: t.column4 ASC NULLS LAST
02)--Projection: __unnest_placeholder(t.column1,depth=2) AS UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1) AS UNNEST(t.column2), __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3, t.column4
02)--Projection: __unnest_placeholder(t.column1,depth=2) AS UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1) AS UNNEST(t.column2), __unnest_placeholder(t.column3).s1 AS t.column3.s1, __unnest_placeholder(t.column3).s2 AS t.column3.s2, __unnest_placeholder(t.column3).s3 AS t.column3.s3, t.column4
03)----Unnest: lists[__unnest_placeholder(t.column1)|depth=2, __unnest_placeholder(t.column2)|depth=1] structs[__unnest_placeholder(t.column3)]
04)------Projection: t.column1 AS __unnest_placeholder(t.column1), t.column2 AS __unnest_placeholder(t.column2), t.column3 AS __unnest_placeholder(t.column3), t.column4
05)--------TableScan: t projection=[column1, column2, column3, column4]
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=2)@0 as UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1)@1 as UNNEST(t.column2), __unnest_placeholder(t.column3).s1@2 as __unnest_placeholder(t.column3).s1, __unnest_placeholder(t.column3).s2@3 as __unnest_placeholder(t.column3).s2, __unnest_placeholder(t.column3).s3@4 as __unnest_placeholder(t.column3).s3, column4@5 as column4]
01)ProjectionExec: expr=[__unnest_placeholder(t.column1,depth=2)@0 as UNNEST(UNNEST(t.column1)), __unnest_placeholder(t.column2,depth=1)@1 as UNNEST(t.column2), __unnest_placeholder(t.column3).s1@2 as t.column3.s1, __unnest_placeholder(t.column3).s2@3 as t.column3.s2, __unnest_placeholder(t.column3).s3@4 as t.column3.s3, column4@5 as column4]
02)--UnnestExec
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/unnest/ordered_struct_arrays.parquet]]}, projection=[column1@0 as __unnest_placeholder(t.column1), column2@1 as __unnest_placeholder(t.column2), column3@2 as __unnest_placeholder(t.column3), column4], output_ordering=[column4@3 ASC NULLS LAST], file_type=parquet

Expand Down
Loading