From 75cd4d79021e0a0bec6e245e1912b64202e8c209 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sun, 22 Mar 2026 00:02:39 -0400 Subject: [PATCH 01/14] fix: preserve subquery structure when unparsing SubqueryAlias over Aggregate When the SQL unparser encountered a SubqueryAlias node whose direct child was an Aggregate (or other clause-building plan like Window, Sort, Limit, Union), it would flatten the subquery into a simple table alias, losing the aggregate entirely. For example, a plan representing: SELECT j1.col FROM j1 JOIN (SELECT max(id) AS m FROM j2) AS b ON j1.id = b.m would unparse to: SELECT j1.col FROM j1 INNER JOIN j2 AS b ON j1.id = b.m dropping the MAX aggregate and the subquery. Root cause: the SubqueryAlias handler in select_to_sql_recursively would call subquery_alias_inner_query_and_columns (which only unwraps Projection children) and unparse_table_scan_pushdown (which only handles TableScan/SubqueryAlias/Projection). When both returned nothing useful for an Aggregate child, the code recursed directly into the Aggregate, merging its GROUP BY into the outer SELECT instead of wrapping it in a derived subquery. The fix adds an early check: if the SubqueryAlias's direct child is a plan type that builds its own SELECT clauses (Aggregate, Window, Sort, Limit, Union), emit it as a derived subquery via self.derive() with the alias always attached, rather than falling through to the recursive path that would flatten it. --- datafusion/sql/src/unparser/plan.rs | 37 ++++++++++++++++ datafusion/sql/tests/cases/plan_to_sql.rs | 54 ++++++++++++++++++++++- 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index ca8dfa431b4f5..73950b961e8ba 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -828,6 +828,27 @@ impl Unparser<'_> { Some(plan_alias.alias.clone()), select.already_projected(), )?; + + // If the SubqueryAlias directly wraps a plan that builds its + // own SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds + // OVER, etc.) and unparse_table_scan_pushdown couldn't reduce it, + // we must emit a derived subquery: (SELECT ...) AS alias. + // Without this, the recursive handler would merge those clauses + // into the outer SELECT, losing the subquery structure entirely. + if unparsed_table_scan.is_none() + && Self::requires_derived_subquery(plan_alias.input.as_ref()) + { + return self.derive( + &plan_alias.input, + relation, + Some(self.new_table_alias( + plan_alias.alias.table().to_string(), + columns, + )), + false, + ); + } + // if the child plan is a TableScan with pushdown operations, we don't need to // create an additional subquery for it if !select.already_projected() && unparsed_table_scan.is_none() { @@ -1060,6 +1081,22 @@ impl Unparser<'_> { scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some() } + /// Returns true if a plan, when used as the direct child of a SubqueryAlias, + /// must be emitted as a derived subquery `(SELECT ...) AS alias`. + /// + /// Plans like Aggregate or Window build their own SELECT clauses (GROUP BY, + /// window functions). + fn requires_derived_subquery(plan: &LogicalPlan) -> bool { + matches!( + plan, + LogicalPlan::Aggregate(_) + | LogicalPlan::Window(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Union(_) + ) + } + /// Try to unparse a table scan with pushdown operations into a new subquery plan. /// If the table scan is without any pushdown operations, return None. fn unparse_table_scan_pushdown( diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index aefb404ba4106..24f9226636455 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -23,7 +23,7 @@ use datafusion_common::{ }; use datafusion_expr::expr::{WindowFunction, WindowFunctionParams}; use datafusion_expr::test::function_stub::{ - count_udaf, max_udaf, min_udaf, sum, sum_udaf, + count_udaf, max, max_udaf, min_udaf, sum, sum_udaf, }; use datafusion_expr::{ EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union, @@ -2893,3 +2893,55 @@ fn test_json_access_3() { @r#"SELECT (j1.j1_string : 'field.inner1[''inner2'']') FROM j1"# ); } + +/// Test that unparsing a manually constructed join with a subquery aggregate +/// preserves the MAX aggregate function. +/// +/// Builds the equivalent of: +/// SELECT j1.j1_string FROM j1 +/// JOIN (SELECT max(j2_id) AS max_id FROM j2) AS b +/// ON j1.j1_id = b.max_id +#[test] +fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let j1_schema = context + .get_table_source(TableReference::bare("j1"))? + .schema(); + let j2_schema = context + .get_table_source(TableReference::bare("j2"))? + .schema(); + + // Build the right side: SELECT max(j2_id) AS max_id FROM j2 + let right_scan = table_scan(Some("j2"), &j2_schema, None)?.build()?; + let right_agg = LogicalPlanBuilder::from(right_scan) + .aggregate(vec![] as Vec, vec![max(col("j2.j2_id")).alias("max_id")])? + .build()?; + let right_subquery = subquery_alias(right_agg, "b")?; + + // Build the full plan: SELECT j1.j1_string FROM j1 JOIN (...) AS b ON j1.j1_id = b.max_id + let left_scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; + let plan = LogicalPlanBuilder::from(left_scan) + .join( + right_subquery, + datafusion_expr::JoinType::Inner, + ( + vec![Column::from_qualified_name("j1.j1_id")], + vec![Column::from_qualified_name("b.max_id")], + ), + None, + )? + .project(vec![col("j1.j1_string")])? + .build()?; + + let unparser = Unparser::default(); + let sql = unparser.plan_to_sql(&plan)?.to_string(); + let sql_upper = sql.to_uppercase(); + assert!( + sql_upper.contains("MAX("), + "Unparsed SQL should preserve the MAX aggregate function call, got: {sql}" + ); + + Ok(()) +} From 0d223f9fea321ca4667aca57c5bca483cc201aa9 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 27 Mar 2026 12:41:57 -0400 Subject: [PATCH 02/14] fix: preserve subquery structure when unparsing SubqueryAlias over Aggregate When the SQL unparser encountered a SubqueryAlias node whose direct child was an Aggregate (or other clause-building plan like Window, Sort, Limit, Union), it would flatten the subquery into a simple table alias, losing the aggregate entirely. For example, a plan representing: SELECT j1.col FROM j1 JOIN (SELECT max(id) AS m FROM j2) AS b ON j1.id = b.m would unparse to: SELECT j1.col FROM j1 INNER JOIN j2 AS b ON j1.id = b.m dropping the MAX aggregate and the subquery. Root cause: the SubqueryAlias handler in select_to_sql_recursively would call subquery_alias_inner_query_and_columns (which only unwraps Projection children) and unparse_table_scan_pushdown (which only handles TableScan/SubqueryAlias/Projection). When both returned nothing useful for an Aggregate child, the code recursed directly into the Aggregate, merging its GROUP BY into the outer SELECT instead of wrapping it in a derived subquery. The fix adds an early check: if the SubqueryAlias's direct child is a plan type that builds its own SELECT clauses (Aggregate, Window, Sort, Limit, Union), emit it as a derived subquery via self.derive() with the alias always attached, rather than falling through to the recursive path that would flatten it. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/src/unparser/plan.rs | 2 +- datafusion/sql/tests/cases/plan_to_sql.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 73950b961e8ba..0b9f24bc87327 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -1085,7 +1085,7 @@ impl Unparser<'_> { /// must be emitted as a derived subquery `(SELECT ...) AS alias`. /// /// Plans like Aggregate or Window build their own SELECT clauses (GROUP BY, - /// window functions). + /// window functions). fn requires_derived_subquery(plan: &LogicalPlan) -> bool { matches!( plan, diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 24f9226636455..db94e32c8d913 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2916,7 +2916,10 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { // Build the right side: SELECT max(j2_id) AS max_id FROM j2 let right_scan = table_scan(Some("j2"), &j2_schema, None)?.build()?; let right_agg = LogicalPlanBuilder::from(right_scan) - .aggregate(vec![] as Vec, vec![max(col("j2.j2_id")).alias("max_id")])? + .aggregate( + vec![] as Vec, + vec![max(col("j2.j2_id")).alias("max_id")], + )? .build()?; let right_subquery = subquery_alias(right_agg, "b")?; From 42f7f64e6a9cd400dc03e10a498bc32ffd67be57 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 3 Apr 2026 22:21:44 -0400 Subject: [PATCH 03/14] Fixes in PR --- datafusion/sql/src/unparser/plan.rs | 33 +++++++++++++++++++---- datafusion/sql/tests/cases/plan_to_sql.rs | 14 ++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 0b9f24bc87327..c0b77b9dba88a 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -829,17 +829,40 @@ impl Unparser<'_> { select.already_projected(), )?; - // If the SubqueryAlias directly wraps a plan that builds its - // own SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds + // If the (possibly rewritten) inner plan builds its own + // SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds // OVER, etc.) and unparse_table_scan_pushdown couldn't reduce it, // we must emit a derived subquery: (SELECT ...) AS alias. // Without this, the recursive handler would merge those clauses // into the outer SELECT, losing the subquery structure entirely. - if unparsed_table_scan.is_none() - && Self::requires_derived_subquery(plan_alias.input.as_ref()) + if unparsed_table_scan.is_none() && Self::requires_derived_subquery(plan) { + // When the dialect does not support column aliases in + // table aliases (e.g. SQLite), inject the aliases into + // the inner projection before wrapping as a derived + // subquery. + if !columns.is_empty() + && !self.dialect.supports_column_alias_in_table_alias() + { + let Ok(rewritten_plan) = + inject_column_aliases_into_subquery(plan.clone(), columns) + else { + return internal_err!( + "Failed to transform SubqueryAlias plan" + ); + }; + return self.derive( + &rewritten_plan, + relation, + Some(self.new_table_alias( + plan_alias.alias.table().to_string(), + vec![], + )), + false, + ); + } return self.derive( - &plan_alias.input, + plan, relation, Some(self.new_table_alias( plan_alias.alias.table().to_string(), diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index db94e32c8d913..9aff555825776 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2894,6 +2894,20 @@ fn test_json_access_3() { ); } +/// Roundtrip test for a subquery aggregate with column aliases. +/// Ensures that `subquery_alias_inner_query_and_columns` unwrapping +/// a Projection -> Aggregate still triggers the derived-subquery path. +#[test] +fn roundtrip_subquery_aggregate_with_column_alias() -> Result<(), DataFusionError> { + roundtrip_statement_with_dialect_helper!( + sql: "SELECT id FROM (SELECT max(j1_id) FROM j1) AS c(id)", + parser_dialect: GenericDialect {}, + unparser_dialect: UnparserDefaultDialect {}, + expected: @"SELECT c.id FROM (SELECT max(j1.j1_id) FROM j1) AS c (id)", + ); + Ok(()) +} + /// Test that unparsing a manually constructed join with a subquery aggregate /// preserves the MAX aggregate function. /// From 8d95d48a618427cb8a74483fa0d050f639913396 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 10 Apr 2026 20:35:59 -0400 Subject: [PATCH 04/14] test: define correct expected output for Snowflake FLATTEN unparsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing tests pass with broken SQL output — the SELECT list still uses DataFusion internal names (__unnest_placeholder) instead of Snowflake's alias.VALUE convention. Update expectations to the correct Snowflake SQL so these tests will drive the implementation. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/tests/cases/plan_to_sql.rs | 39 ++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 923812138c016..f84383d00b624 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -39,7 +39,7 @@ use datafusion_sql::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_sql::unparser::dialect::{ BigQueryDialect, CustomDialectBuilder, DefaultDialect as UnparserDefaultDialect, DefaultDialect, Dialect as UnparserDialect, MySqlDialect as UnparserMySqlDialect, - PostgreSqlDialect as UnparserPostgreSqlDialect, SqliteDialect, + PostgreSqlDialect as UnparserPostgreSqlDialect, SnowflakeDialect, SqliteDialect, }; use datafusion_sql::unparser::{Unparser, expr_to_sql, plan_to_sql}; use insta::assert_snapshot; @@ -2995,3 +2995,40 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { Ok(()) } + +#[test] +fn snowflake_unnest_to_lateral_flatten_simple() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST([1,2,3])", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT _unnest."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, + ); + Ok(()) +} + +#[test] +fn snowflake_unnest_to_lateral_flatten_with_cross_join() -> Result<(), DataFusionError> +{ + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST([1,2,3]), j1", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT _unnest."VALUE", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest CROSS JOIN "j1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_unnest_to_lateral_flatten_outer_ref() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "u"."array_col", "u"."struct_col", _unnest."VALUE" FROM "unnest_table" AS "u" CROSS JOIN LATERAL FLATTEN(INPUT => "u"."array_col") AS _unnest"#, + ); + Ok(()) +} From 6acaa9cabe7d3742d14a0d68692c6cd177e69215 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 10 Apr 2026 23:17:04 -0400 Subject: [PATCH 05/14] Working on unnest support for snowflake --- datafusion/sql/src/unparser/ast.rs | 99 ++++++++++++++- datafusion/sql/src/unparser/dialect.rs | 79 ++++++++++++ datafusion/sql/src/unparser/plan.rs | 148 ++++++++++++++++++++-- datafusion/sql/src/unparser/utils.rs | 30 ++++- datafusion/sql/tests/cases/plan_to_sql.rs | 78 +++++++++++- 5 files changed, 417 insertions(+), 17 deletions(-) diff --git a/datafusion/sql/src/unparser/ast.rs b/datafusion/sql/src/unparser/ast.rs index 8446a44b07e35..b49313f9179e5 100644 --- a/datafusion/sql/src/unparser/ast.rs +++ b/datafusion/sql/src/unparser/ast.rs @@ -432,11 +432,11 @@ pub struct RelationBuilder { } #[derive(Clone)] -#[expect(clippy::large_enum_variant)] enum TableFactorBuilder { Table(TableRelationBuilder), Derived(DerivedRelationBuilder), Unnest(UnnestRelationBuilder), + Flatten(FlattenRelationBuilder), Empty, } @@ -458,6 +458,11 @@ impl RelationBuilder { self } + pub fn flatten(&mut self, value: FlattenRelationBuilder) -> &mut Self { + self.relation = Some(TableFactorBuilder::Flatten(value)); + self + } + pub fn empty(&mut self) -> &mut Self { self.relation = Some(TableFactorBuilder::Empty); self @@ -474,6 +479,9 @@ impl RelationBuilder { Some(TableFactorBuilder::Unnest(ref mut rel_builder)) => { rel_builder.alias = value; } + Some(TableFactorBuilder::Flatten(ref mut rel_builder)) => { + rel_builder.alias = value; + } Some(TableFactorBuilder::Empty) => (), None => (), } @@ -484,6 +492,7 @@ impl RelationBuilder { Some(TableFactorBuilder::Table(ref value)) => Some(value.build()?), Some(TableFactorBuilder::Derived(ref value)) => Some(value.build()?), Some(TableFactorBuilder::Unnest(ref value)) => Some(value.build()?), + Some(TableFactorBuilder::Flatten(ref value)) => Some(value.build()?), Some(TableFactorBuilder::Empty) => None, None => return Err(Into::into(UninitializedFieldError::from("relation"))), }) @@ -688,6 +697,94 @@ impl Default for UnnestRelationBuilder { } } +/// Default table alias for FLATTEN table factors. +/// Snowflake requires an alias to reference output columns (e.g. `_unnest.VALUE`). +pub const FLATTEN_DEFAULT_ALIAS: &str = "_unnest"; + +/// Builds a `LATERAL FLATTEN(INPUT => expr, OUTER => bool)` table factor +/// for Snowflake-style unnesting. +#[derive(Clone)] +pub struct FlattenRelationBuilder { + pub alias: Option, + /// The input expression to flatten (e.g. a column reference). + pub input_expr: Option, + /// Whether to preserve rows for NULL/empty inputs (Snowflake `OUTER` param). + pub outer: bool, +} + +impl FlattenRelationBuilder { + pub fn alias(&mut self, value: Option) -> &mut Self { + self.alias = value; + self + } + + pub fn input_expr(&mut self, value: ast::Expr) -> &mut Self { + self.input_expr = Some(value); + self + } + + pub fn outer(&mut self, value: bool) -> &mut Self { + self.outer = value; + self + } + + pub fn build(&self) -> Result { + let input = self.input_expr.clone().ok_or_else(|| { + BuilderError::from(UninitializedFieldError::from("input_expr")) + })?; + + let mut args = vec![ast::FunctionArg::Named { + name: ast::Ident::new("INPUT"), + arg: ast::FunctionArgExpr::Expr(input), + operator: ast::FunctionArgOperator::RightArrow, + }]; + + if self.outer { + args.push(ast::FunctionArg::Named { + name: ast::Ident::new("OUTER"), + arg: ast::FunctionArgExpr::Expr(ast::Expr::Value( + ast::Value::Boolean(true).into(), + )), + operator: ast::FunctionArgOperator::RightArrow, + }); + } + + Ok(ast::TableFactor::Function { + lateral: true, + name: ast::ObjectName::from(vec![ast::Ident::new("FLATTEN")]), + args, + alias: self.alias.clone(), + }) + } + + /// Returns the alias name for this FLATTEN relation. + /// Used to build qualified column references like `alias.VALUE`. + pub fn alias_name(&self) -> &str { + self.alias + .as_ref() + .map(|a| a.name.value.as_str()) + .unwrap_or(FLATTEN_DEFAULT_ALIAS) + } + + fn create_empty() -> Self { + Self { + alias: Some(ast::TableAlias { + name: ast::Ident::new(FLATTEN_DEFAULT_ALIAS), + columns: vec![], + explicit: true, + }), + input_expr: None, + outer: false, + } + } +} + +impl Default for FlattenRelationBuilder { + fn default() -> Self { + Self::create_empty() + } +} + /// Runtime error when a `build()` method is called and one or more required fields /// do not have a value. #[derive(Debug, Clone)] diff --git a/datafusion/sql/src/unparser/dialect.rs b/datafusion/sql/src/unparser/dialect.rs index ee31190b68b98..d6287a2084fac 100644 --- a/datafusion/sql/src/unparser/dialect.rs +++ b/datafusion/sql/src/unparser/dialect.rs @@ -206,6 +206,15 @@ pub trait Dialect: Send + Sync { false } + /// Unparse the unnest plan as `LATERAL FLATTEN(INPUT => expr, ...)`. + /// + /// Snowflake uses FLATTEN as a table function instead of the SQL-standard UNNEST. + /// When this returns `true`, the unparser emits + /// `LATERAL FLATTEN(INPUT => , OUTER => )` in the FROM clause. + fn unnest_as_lateral_flatten(&self) -> bool { + false + } + /// Allows the dialect to override column alias unparsing if the dialect has specific rules. /// Returns None if the default unparsing should be used, or Some(String) if there is /// a custom implementation for the alias. @@ -664,6 +673,59 @@ impl BigQueryDialect { } } +/// Dialect for Snowflake SQL. +/// +/// Key differences from the default dialect: +/// - Uses double-quote identifier quoting +/// - Supports `NULLS FIRST`/`NULLS LAST` in `ORDER BY` +/// - Does not support empty select lists (`SELECT FROM t`) +/// - Does not support column aliases in table alias definitions +/// (Snowflake accepts the syntax but silently ignores the renames in join contexts) +/// - Unparses `UNNEST` plans as `LATERAL FLATTEN(INPUT => expr, ...)` +pub struct SnowflakeDialect {} + +#[expect(clippy::new_without_default)] +impl SnowflakeDialect { + #[must_use] + pub fn new() -> Self { + Self {} + } +} + +impl Dialect for SnowflakeDialect { + fn identifier_quote_style(&self, _: &str) -> Option { + Some('"') + } + + fn supports_nulls_first_in_sort(&self) -> bool { + true + } + + fn supports_empty_select_list(&self) -> bool { + false + } + + fn supports_column_alias_in_table_alias(&self) -> bool { + false + } + + fn timestamp_cast_dtype( + &self, + _time_unit: &TimeUnit, + tz: &Option>, + ) -> ast::DataType { + if tz.is_some() { + ast::DataType::Timestamp(None, TimezoneInfo::WithTimeZone) + } else { + ast::DataType::Timestamp(None, TimezoneInfo::None) + } + } + + fn unnest_as_lateral_flatten(&self) -> bool { + true + } +} + pub struct CustomDialect { identifier_quote_style: Option, supports_nulls_first_in_sort: bool, @@ -686,6 +748,7 @@ pub struct CustomDialect { window_func_support_window_frame: bool, full_qualified_col: bool, unnest_as_table_factor: bool, + unnest_as_lateral_flatten: bool, } impl Default for CustomDialect { @@ -715,6 +778,7 @@ impl Default for CustomDialect { window_func_support_window_frame: true, full_qualified_col: false, unnest_as_table_factor: false, + unnest_as_lateral_flatten: false, } } } @@ -829,6 +893,10 @@ impl Dialect for CustomDialect { fn unnest_as_table_factor(&self) -> bool { self.unnest_as_table_factor } + + fn unnest_as_lateral_flatten(&self) -> bool { + self.unnest_as_lateral_flatten + } } /// `CustomDialectBuilder` to build `CustomDialect` using builder pattern @@ -867,6 +935,7 @@ pub struct CustomDialectBuilder { window_func_support_window_frame: bool, full_qualified_col: bool, unnest_as_table_factor: bool, + unnest_as_lateral_flatten: bool, } impl Default for CustomDialectBuilder { @@ -902,6 +971,7 @@ impl CustomDialectBuilder { window_func_support_window_frame: true, full_qualified_col: false, unnest_as_table_factor: false, + unnest_as_lateral_flatten: false, } } @@ -929,6 +999,7 @@ impl CustomDialectBuilder { window_func_support_window_frame: self.window_func_support_window_frame, full_qualified_col: self.full_qualified_col, unnest_as_table_factor: self.unnest_as_table_factor, + unnest_as_lateral_flatten: self.unnest_as_lateral_flatten, } } @@ -1075,4 +1146,12 @@ impl CustomDialectBuilder { self.unnest_as_table_factor = unnest_as_table_factor; self } + + pub fn with_unnest_as_lateral_flatten( + mut self, + unnest_as_lateral_flatten: bool, + ) -> Self { + self.unnest_as_lateral_flatten = unnest_as_lateral_flatten; + self + } } diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index c0b77b9dba88a..e08613e50e889 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -29,21 +29,25 @@ use super::{ utils::{ find_agg_node_within_select, find_unnest_node_within_select, find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters, - unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs, + unproject_sort_expr, unproject_unnest_expr, + unproject_unnest_expr_as_flatten_value, unproject_window_exprs, }, }; use crate::unparser::extension_unparser::{ UnparseToStatementResult, UnparseWithinStatementResult, }; use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs}; -use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify}; +use crate::unparser::{ + ast::FLATTEN_DEFAULT_ALIAS, ast::FlattenRelationBuilder, ast::UnnestRelationBuilder, + rewrite::rewrite_qualify, +}; use crate::utils::UNNEST_PLACEHOLDER; use datafusion_common::{ Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err, internal_err, not_impl_err, tree_node::{TransformedResult, TreeNode}, }; -use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX; +use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX}; use datafusion_expr::{ BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest, @@ -236,10 +240,23 @@ impl Unparser<'_> { // If an Unnest node is found within the select, find and unproject the unnest column if let Some(unnest) = find_unnest_node_within_select(plan) { - exprs = exprs - .into_iter() - .map(|e| unproject_unnest_expr(e, unnest)) - .collect::>>()?; + if self.dialect.unnest_as_lateral_flatten() { + exprs = exprs + .into_iter() + .map(|e| { + unproject_unnest_expr_as_flatten_value( + e, + unnest, + FLATTEN_DEFAULT_ALIAS, + ) + }) + .collect::>>()?; + } else { + exprs = exprs + .into_iter() + .map(|e| unproject_unnest_expr(e, unnest)) + .collect::>>()?; + } }; match ( @@ -277,9 +294,35 @@ impl Unparser<'_> { select.projection(items); } _ => { + let use_flatten = self.dialect.unnest_as_lateral_flatten(); let items = exprs .iter() - .map(|e| self.select_item_to_sql(e)) + .map(|e| { + if use_flatten { + // For Snowflake FLATTEN: rewrite UNNEST output columns + // to `_unnest."VALUE"`. After unproject_unnest_expr_as_flatten_value + // runs, the expression is either: + // - bare Column with UNNEST prefix (outer ref case) + // - Alias { name: "UNNEST(...)", expr: Column(_unnest.VALUE) } + // (the inner column is already rewritten but the alias preserves + // the internal UNNEST display name) + let is_unnest_col = match e { + Expr::Column(col) => col + .name + .starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")), + Expr::Alias(Alias { name, .. }) => { + name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")) + } + _ => false, + }; + if is_unnest_col { + return Ok(self.build_flatten_value_select_item( + FLATTEN_DEFAULT_ALIAS, + )); + } + } + self.select_item_to_sql(e) + }) .collect::>>()?; select.projection(items); } @@ -384,6 +427,34 @@ impl Unparser<'_> { } else { None }; + if self.dialect.unnest_as_lateral_flatten() + && unnest_input_type.is_some() + && let LogicalPlan::Unnest(unnest) = p.input.as_ref() + && let Some(flatten_relation) = + self.try_unnest_to_lateral_flatten_sql(unnest)? + { + let alias_name = flatten_relation.alias_name().to_string(); + relation.flatten(flatten_relation); + + // Only set the projection if it hasn't already been set by + // an outer Projection (e.g. SELECT * FROM t, UNNEST(...)). + // When the outer Projection already called + // reconstruct_select_statement, the SELECT list is correct + // and we must not replace it with just `_unnest."VALUE"`. + if !select.already_projected() { + let value_expr = + self.build_flatten_value_select_item(&alias_name); + select.projection(vec![value_expr]); + select.already_projected(); + } + + return self.select_to_sql_recursively( + p.input.as_ref(), + query, + select, + relation, + ); + } if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() && let LogicalPlan::Unnest(unnest) = &p.input.as_ref() @@ -1001,12 +1072,35 @@ impl Unparser<'_> { } } LogicalPlan::Unnest(unnest) => { + if self.dialect.unnest_as_lateral_flatten() + && !unnest.struct_type_columns.is_empty() + { + return not_impl_err!( + "Snowflake FLATTEN cannot unparse struct unnest: \ + DataFusion expands struct fields into columns (horizontal), \ + but Snowflake FLATTEN expands them into rows (vertical). \ + Columns: {:?}", + unnest.struct_type_columns + ); + } + assert_or_internal_err!( unnest.struct_type_columns.is_empty(), "Struct type columns are not currently supported in UNNEST: {:?}", unnest.struct_type_columns ); + // For Snowflake FLATTEN: if the relation hasn't been set yet + // (UNNEST was in SELECT clause, not FROM clause), set the FLATTEN + // relation here so the FROM clause is emitted. + if self.dialect.unnest_as_lateral_flatten() + && !relation.has_relation() + && let Some(flatten_relation) = + self.try_unnest_to_lateral_flatten_sql(unnest)? + { + relation.flatten(flatten_relation); + } + // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip. // Otherwise, there will be a duplicate SELECT clause. // | Projection: table.col1, UNNEST(table.col2) @@ -1025,7 +1119,9 @@ impl Unparser<'_> { if find_unnest_node_until_relation(subquery.subquery.as_ref()) .is_some() => { - if self.dialect.unnest_as_table_factor() { + if self.dialect.unnest_as_table_factor() + || self.dialect.unnest_as_lateral_flatten() + { self.select_to_sql_recursively( subquery.subquery.as_ref(), query, @@ -1100,6 +1196,40 @@ impl Unparser<'_> { Ok(Some(unnest_relation)) } + /// Build a `SELECT alias."VALUE"` item for Snowflake FLATTEN output. + fn build_flatten_value_select_item(&self, alias_name: &str) -> ast::SelectItem { + let compound = ast::Expr::CompoundIdentifier(vec![ + self.new_ident_without_quote_style(alias_name.to_string()), + Ident::with_quote('"', "VALUE"), + ]); + ast::SelectItem::UnnamedExpr(compound) + } + + /// Convert an `Unnest` logical plan node to a `LATERAL FLATTEN(INPUT => expr, ...)` + /// table factor for Snowflake-style SQL output. + fn try_unnest_to_lateral_flatten_sql( + &self, + unnest: &Unnest, + ) -> Result> { + let LogicalPlan::Projection(projection) = unnest.input.as_ref() else { + return Ok(None); + }; + + // For now, handle the simple case of a single expression to flatten. + // Multi-expression would require multiple LATERAL FLATTEN calls chained together. + let Some(first_expr) = projection.expr.first() else { + return Ok(None); + }; + + let input_expr = self.expr_to_sql(first_expr)?; + + let mut flatten = FlattenRelationBuilder::default(); + flatten.input_expr(input_expr); + flatten.outer(unnest.options.preserve_nulls); + + Ok(Some(flatten)) + } + fn is_scan_with_pushdown(scan: &TableScan) -> bool { scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some() } diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index f539c0ddc1e87..3657516d534aa 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -22,8 +22,8 @@ use super::{ rewrite::TableAliasRewriter, }; use datafusion_common::{ - Column, DataFusionError, Result, ScalarValue, assert_eq_or_internal_err, - internal_err, + Column, DataFusionError, Result, ScalarValue, TableReference, + assert_eq_or_internal_err, internal_err, tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_expr::{ @@ -183,6 +183,32 @@ pub(crate) fn unproject_unnest_expr(expr: Expr, unnest: &Unnest) -> Result }).map(|e| e.data) } +/// Like `unproject_unnest_expr`, but for Snowflake FLATTEN: +/// transforms `__unnest_placeholder(...)` column references into +/// `Expr::Column(Column { relation: Some(alias), name: "VALUE" })`. +pub(crate) fn unproject_unnest_expr_as_flatten_value( + expr: Expr, + unnest: &Unnest, + flatten_alias: &str, +) -> Result { + expr.transform(|sub_expr| { + if let Expr::Column(col_ref) = &sub_expr + && unnest + .list_type_columns + .iter() + .any(|e| e.1.output_column.name == col_ref.name) + { + let value_col = Expr::Column(Column::new( + Some(TableReference::bare(flatten_alias)), + "VALUE", + )); + return Ok(Transformed::yes(value_col)); + } + Ok(Transformed::no(sub_expr)) + }) + .map(|e| e.data) +} + /// Recursively identify all Column expressions and transform them into the appropriate /// aggregate expression contained in agg. /// diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index f84383d00b624..141f7ec40dcd3 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3009,8 +3009,7 @@ fn snowflake_unnest_to_lateral_flatten_simple() -> Result<(), DataFusionError> { } #[test] -fn snowflake_unnest_to_lateral_flatten_with_cross_join() -> Result<(), DataFusionError> -{ +fn snowflake_unnest_to_lateral_flatten_with_cross_join() -> Result<(), DataFusionError> { let snowflake = SnowflakeDialect::new(); roundtrip_statement_with_dialect_helper!( sql: "SELECT * FROM UNNEST([1,2,3]), j1", @@ -3022,13 +3021,82 @@ fn snowflake_unnest_to_lateral_flatten_with_cross_join() -> Result<(), DataFusio } #[test] -fn snowflake_unnest_to_lateral_flatten_outer_ref() -> Result<(), DataFusionError> { +fn snowflake_unnest_to_lateral_flatten_cross_join_inline() -> Result<(), DataFusionError> +{ + // Cross join with two inline UNNEST sources — both produce valid FLATTEN. + // NOTE: UNNEST(table.column) is NOT tested with Snowflake because + // LATERAL FLATTEN(INPUT => col) requires the column to be a Snowflake + // VARIANT/ARRAY type, which cannot be validated at unparse time. let snowflake = SnowflakeDialect::new(); roundtrip_statement_with_dialect_helper!( - sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)", + sql: "SELECT * FROM UNNEST([1,2,3]) u(c1) JOIN j1 ON u.c1 = j1.j1_id", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "u"."c1", "j1"."j1_id", "j1"."j1_string" FROM (SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest) AS "u" INNER JOIN "j1" ON ("u"."c1" = "j1"."j1_id")"#, + ); + Ok(()) +} + +// --- Edge case tests for Snowflake FLATTEN --- + +#[test] +fn snowflake_flatten_implicit_from() -> Result<(), DataFusionError> { + // UNNEST in SELECT clause (no explicit FROM UNNEST) — implicit table factor + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT UNNEST([1,2,3])", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @"SELECT _unnest.\"VALUE\" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest", + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_string_array() -> Result<(), DataFusionError> { + // String array unnest + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST(['a','b','c'])", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @"SELECT _unnest.\"VALUE\" FROM LATERAL FLATTEN(INPUT => ['a', 'b', 'c']) AS _unnest", + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_select_unnest_with_alias() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT UNNEST([1,2,3]) as c1", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_select_unnest_plus_literal() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT UNNEST([1,2,3]), 1", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT _unnest."VALUE", "Int64(1)" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_from_unnest_with_table_alias() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST([1,2,3]) AS t1 (c1)", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "u"."array_col", "u"."struct_col", _unnest."VALUE" FROM "unnest_table" AS "u" CROSS JOIN LATERAL FLATTEN(INPUT => "u"."array_col") AS _unnest"#, + expected: @r#"SELECT "t1"."c1" FROM (SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest) AS "t1""#, ); Ok(()) } From e70196235e3e38c4efef82f65d8c47561dbaa9cf Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sat, 11 Apr 2026 13:43:13 -0400 Subject: [PATCH 06/14] Snowflake Dialect --- datafusion/sql/src/unparser/plan.rs | 131 +++++++++++++++++++--- datafusion/sql/tests/cases/plan_to_sql.rs | 93 ++++++++++++++- 2 files changed, 201 insertions(+), 23 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index e08613e50e889..d894ac3df0937 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -423,7 +423,10 @@ impl Unparser<'_> { // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have // only one expression, which is the placeholder column generated by the rewriter. let unnest_input_type = if p.expr.len() == 1 { - Self::check_unnest_placeholder_with_outer_ref(&p.expr[0]) + Self::check_unnest_placeholder_with_outer_ref( + &p.expr[0], + self.dialect.unnest_as_lateral_flatten(), + ) } else { None }; @@ -434,26 +437,98 @@ impl Unparser<'_> { self.try_unnest_to_lateral_flatten_sql(unnest)? { let alias_name = flatten_relation.alias_name().to_string(); - relation.flatten(flatten_relation); - // Only set the projection if it hasn't already been set by - // an outer Projection (e.g. SELECT * FROM t, UNNEST(...)). - // When the outer Projection already called - // reconstruct_select_statement, the SELECT list is correct - // and we must not replace it with just `_unnest."VALUE"`. + // Check if the Unnest source is a real query (subselect) + // vs an inline array (EmptyRelation). + let inner_projection = match unnest.input.as_ref() { + LogicalPlan::Projection(proj) => proj, + other => { + return internal_err!( + "Unnest input is not a Projection: {other:?}" + ); + } + }; + + if matches!( + inner_projection.input.as_ref(), + LogicalPlan::EmptyRelation(_) + ) { + // Inline array case (e.g. UNNEST([1,2,3])): + // FLATTEN is the sole FROM source. + relation.flatten(flatten_relation); + + if !select.already_projected() { + let value_expr = + self.build_flatten_value_select_item(&alias_name); + select.projection(vec![value_expr]); + } + + return self.select_to_sql_recursively( + p.input.as_ref(), + query, + select, + relation, + ); + } + + // Non-empty source (table, subquery, etc.): + // The FLATTEN references columns/expressions from the source. + // Convert the inner Projection's expression to SQL for the + // FLATTEN INPUT, then store the FLATTEN to be added as a + // CROSS JOIN after the source relation is processed. + let first_expr = inner_projection.expr.first().ok_or_else(|| { + DataFusionError::Internal( + "Unnest inner projection has no expressions".to_string(), + ) + })?; + // Strip the __unnest_placeholder alias to get the raw expression + let raw_expr = match first_expr { + Expr::Alias(Alias { expr, .. }) => expr.as_ref(), + other => other, + }; + let input_sql = self.expr_to_sql(raw_expr)?; + + let mut flatten = FlattenRelationBuilder::default(); + flatten.input_expr(input_sql); + flatten.outer(unnest.options.preserve_nulls); + if !select.already_projected() { let value_expr = self.build_flatten_value_select_item(&alias_name); select.projection(vec![value_expr]); - select.already_projected(); } - return self.select_to_sql_recursively( + // Recurse into the Unnest → inner source to set the primary + // relation (table scan, subquery, etc.), then add FLATTEN + // as a CROSS JOIN. + self.select_to_sql_recursively( p.input.as_ref(), query, select, relation, - ); + )?; + + let flatten_factor = flatten.build().map_err(|e| { + DataFusionError::Internal(format!("Failed to build FLATTEN: {e}")) + })?; + let cross_join = ast::Join { + relation: flatten_factor, + global: false, + join_operator: ast::JoinOperator::CrossJoin( + ast::JoinConstraint::None, + ), + }; + if let Some(mut from) = select.pop_from() { + from.push_join(cross_join); + select.push_from(from); + } else { + // No existing FROM — create one with just the FLATTEN + let mut twj = TableWithJoinsBuilder::default(); + twj.push_join(cross_join); + select.push_from(twj); + } + + return Ok(()); } if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() @@ -1153,15 +1228,35 @@ impl Unparser<'_> { /// - If the column is not a placeholder column, return [None]. /// /// `outer_ref` is the display result of [Expr::OuterReferenceColumn] - fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option { - if let Expr::Alias(Alias { expr, .. }) = expr - && let Expr::Column(Column { name, .. }) = expr.as_ref() - && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) - { - if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) { - return Some(UnnestInputType::OuterReference); + /// When `deep_peel` is true, peels through multiple Alias layers to find + /// the inner Column. This is needed for Snowflake FLATTEN where user aliases + /// (e.g. `AS items`) add extra Alias wrappers around the placeholder column. + fn check_unnest_placeholder_with_outer_ref( + expr: &Expr, + deep_peel: bool, + ) -> Option { + let inner = match expr { + Expr::Alias(Alias { expr, .. }) => { + if deep_peel { + // Peel through all Alias layers + let mut e = expr.as_ref(); + while let Expr::Alias(Alias { expr: next, .. }) = e { + e = next.as_ref(); + } + e + } else { + expr.as_ref() + } + } + _ => return None, + }; + if let Expr::Column(Column { name, .. }) = inner { + if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) { + if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) { + return Some(UnnestInputType::OuterReference); + } + return Some(UnnestInputType::Scalar); } - return Some(UnnestInputType::Scalar); } None } diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 141f7ec40dcd3..96f168f3b12fc 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -26,8 +26,9 @@ use datafusion_expr::test::function_stub::{ count_udaf, max, max_udaf, min_udaf, sum, sum_udaf, }; use datafusion_expr::{ - EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union, - UserDefinedLogicalNode, UserDefinedLogicalNodeCore, WindowFrame, + ColumnarValue, EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, + ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Union, + UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Volatility, WindowFrame, WindowFunctionDefinition, cast, col, lit, table_scan, wildcard, }; use datafusion_functions::unicode; @@ -3032,7 +3033,7 @@ fn snowflake_unnest_to_lateral_flatten_cross_join_inline() -> Result<(), DataFus sql: "SELECT * FROM UNNEST([1,2,3]) u(c1) JOIN j1 ON u.c1 = j1.j1_id", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "u"."c1", "j1"."j1_id", "j1"."j1_string" FROM (SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest) AS "u" INNER JOIN "j1" ON ("u"."c1" = "j1"."j1_id")"#, + expected: @r#"SELECT "u"."c1", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "u" INNER JOIN "j1" ON ("u"."c1" = "j1"."j1_id")"#, ); Ok(()) } @@ -3072,7 +3073,7 @@ fn snowflake_flatten_select_unnest_with_alias() -> Result<(), DataFusionError> { sql: "SELECT UNNEST([1,2,3]) as c1", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, + expected: @r#"SELECT _unnest."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, ); Ok(()) } @@ -3096,7 +3097,89 @@ fn snowflake_flatten_from_unnest_with_table_alias() -> Result<(), DataFusionErro sql: "SELECT * FROM UNNEST([1,2,3]) AS t1 (c1)", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "t1"."c1" FROM (SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest) AS "t1""#, + expected: @r#"SELECT "t1"."c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "t1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_unnest_from_subselect() -> Result<(), DataFusionError> { + // UNNEST operating on an array column produced by a subselect. + // Uses unnest_table which has array_col (List). + // The filter uses array_col IS NOT NULL — a simple predicate + // that doesn't involve struct types (which Snowflake FLATTEN can't handle). + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT UNNEST(array_col) FROM (SELECT array_col FROM unnest_table WHERE array_col IS NOT NULL LIMIT 3)", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT _unnest."VALUE" FROM (SELECT "unnest_table"."array_col" FROM "unnest_table" WHERE "unnest_table"."array_col" IS NOT NULL LIMIT 3) CROSS JOIN LATERAL FLATTEN(INPUT => "unnest_table"."array_col") AS _unnest"#, ); Ok(()) } + +/// Dummy scalar UDF for testing — takes a string and returns List. +#[derive(Debug, PartialEq, Eq, Hash)] +struct JsonGetArrayUdf { + signature: Signature, +} + +impl JsonGetArrayUdf { + fn new() -> Self { + Self { + signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for JsonGetArrayUdf { + fn name(&self) -> &str { + "json_get_array" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::List(Arc::new(Field::new_list_field( + DataType::Int64, + true, + )))) + } + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + unimplemented!("test stub") + } +} + +#[test] +fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { + // UNNEST on a UDF result: json_get_array(col) returns List, + // then UNNEST flattens it. This simulates a common Snowflake pattern + // where a UDF parses JSON into an array, then FLATTEN expands it. + let sql = "SELECT UNNEST(json_get_array(j1_string)) AS items FROM j1 LIMIT 5"; + + let statement = Parser::new(&GenericDialect {}) + .try_with_sql(sql)? + .parse_statement()?; + + let state = MockSessionState::default() + .with_aggregate_function(max_udaf()) + .with_aggregate_function(min_udaf()) + .with_scalar_function(Arc::new(ScalarUDF::new_from_impl(JsonGetArrayUdf::new()))) + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())) + .with_expr_planner(Arc::new(NestedFunctionPlanner)) + .with_expr_planner(Arc::new(FieldAccessPlanner)); + + let context = MockContextProvider { state }; + let sql_to_rel = SqlToRel::new(&context); + let plan = sql_to_rel + .sql_statement_to_plan(statement) + .unwrap_or_else(|e| panic!("Failed to parse sql: {sql}\n{e}")); + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => json_get_array("j1"."j1_string")) AS _unnest LIMIT 5"#); + Ok(()) +} From 93211381a2067ac8649b9b43b3d7c8ce422dc37f Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sat, 11 Apr 2026 15:34:14 -0400 Subject: [PATCH 07/14] fix: Snowflake LATERAL FLATTEN handles SubqueryAlias between Unnest and Projection When a table is accessed through a passthrough/virtual table mapping, DataFusion inserts a SubqueryAlias node between Unnest and its inner Projection. The FLATTEN rendering code assumed a direct Projection child and failed with "Unnest input is not a Projection: SubqueryAlias(...)". Peel through SubqueryAlias in three code paths that inspect unnest.input: try_unnest_to_lateral_flatten_sql, the inline-vs-table source check, and the general unnest recursion. Also fix a pre-existing collapsible_if clippy warning in check_unnest_placeholder_with_outer_ref. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/src/unparser/plan.rs | 37 ++++++++++++++++++----- datafusion/sql/tests/cases/plan_to_sql.rs | 33 ++++++++++++++++++++ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index d894ac3df0937..6ec09fb32c705 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -442,6 +442,14 @@ impl Unparser<'_> { // vs an inline array (EmptyRelation). let inner_projection = match unnest.input.as_ref() { LogicalPlan::Projection(proj) => proj, + LogicalPlan::SubqueryAlias(alias) => match alias.input.as_ref() { + LogicalPlan::Projection(proj) => proj, + other => { + return internal_err!( + "Unnest input (through SubqueryAlias) is not a Projection: {other:?}" + ); + } + }, other => { return internal_err!( "Unnest input is not a Projection: {other:?}" @@ -1186,6 +1194,11 @@ impl Unparser<'_> { if let LogicalPlan::Projection(p) = unnest.input.as_ref() { // continue with projection input self.select_to_sql_recursively(&p.input, query, select, relation) + } else if let LogicalPlan::SubqueryAlias(alias) = unnest.input.as_ref() + && let LogicalPlan::Projection(p) = alias.input.as_ref() + { + // SubqueryAlias wraps the Projection (e.g. passthrough tables) + self.select_to_sql_recursively(&p.input, query, select, relation) } else { internal_err!("Unnest input is not a Projection: {unnest:?}") } @@ -1250,13 +1263,13 @@ impl Unparser<'_> { } _ => return None, }; - if let Expr::Column(Column { name, .. }) = inner { - if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) { - if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) { - return Some(UnnestInputType::OuterReference); - } - return Some(UnnestInputType::Scalar); + if let Expr::Column(Column { name, .. }) = inner + && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) + { + if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) { + return Some(UnnestInputType::OuterReference); } + return Some(UnnestInputType::Scalar); } None } @@ -1306,8 +1319,16 @@ impl Unparser<'_> { &self, unnest: &Unnest, ) -> Result> { - let LogicalPlan::Projection(projection) = unnest.input.as_ref() else { - return Ok(None); + let projection = match unnest.input.as_ref() { + LogicalPlan::Projection(p) => p, + LogicalPlan::SubqueryAlias(alias) => { + if let LogicalPlan::Projection(p) = alias.input.as_ref() { + p + } else { + return Ok(None); + } + } + _ => return Ok(None), }; // For now, handle the simple case of a single expression to flatten. diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 96f168f3b12fc..d0d872adf1be9 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3183,3 +3183,36 @@ fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => json_get_array("j1"."j1_string")) AS _unnest LIMIT 5"#); Ok(()) } + +#[test] +fn snowflake_unnest_through_subquery_alias() -> Result<(), DataFusionError> { + // Build: Projection → Unnest → SubqueryAlias → Projection → TableScan + // This simulates the plan produced when a virtual/passthrough table + // wraps the source in a SubqueryAlias, which sits between the Unnest + // and its inner Projection. + + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .alias("t")? // SubqueryAlias — this is what breaks + .unnest_column("__unnest_placeholder(items)")? + .project(vec![col("__unnest_placeholder(items)").alias("item")])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let sql_str = result.to_string(); + + // Should contain LATERAL FLATTEN, not error + assert!( + sql_str.contains("LATERAL FLATTEN"), + "Expected LATERAL FLATTEN in SQL, got: {sql_str}" + ); + Ok(()) +} From 065e1111a5f7e9307d2d9b97dc9770bfefb29db3 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sun, 12 Apr 2026 10:21:55 -0400 Subject: [PATCH 08/14] Snowflake Dialect --- datafusion/sql/src/unparser/plan.rs | 85 ++++++++++++++--------- datafusion/sql/tests/cases/plan_to_sql.rs | 4 +- 2 files changed, 54 insertions(+), 35 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 6ec09fb32c705..5ebbac786c3a5 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -318,6 +318,7 @@ impl Unparser<'_> { if is_unnest_col { return Ok(self.build_flatten_value_select_item( FLATTEN_DEFAULT_ALIAS, + None, )); } } @@ -423,10 +424,14 @@ impl Unparser<'_> { // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have // only one expression, which is the placeholder column generated by the rewriter. let unnest_input_type = if p.expr.len() == 1 { - Self::check_unnest_placeholder_with_outer_ref( - &p.expr[0], - self.dialect.unnest_as_lateral_flatten(), - ) + Self::check_unnest_placeholder_with_outer_ref(&p.expr[0]) + } else { + None + }; + // Extract the outermost user alias (e.g. "c1" from `UNNEST(...) AS c1`). + // Internal aliases like "UNNEST(...)" are not user aliases. + let user_alias = if unnest_input_type.is_some() { + Self::extract_unnest_user_alias(&p.expr[0]) } else { None }; @@ -466,8 +471,10 @@ impl Unparser<'_> { relation.flatten(flatten_relation); if !select.already_projected() { - let value_expr = - self.build_flatten_value_select_item(&alias_name); + let value_expr = self.build_flatten_value_select_item( + &alias_name, + user_alias.as_deref(), + ); select.projection(vec![value_expr]); } @@ -501,8 +508,10 @@ impl Unparser<'_> { flatten.outer(unnest.options.preserve_nulls); if !select.already_projected() { - let value_expr = - self.build_flatten_value_select_item(&alias_name); + let value_expr = self.build_flatten_value_select_item( + &alias_name, + user_alias.as_deref(), + ); select.projection(vec![value_expr]); } @@ -540,6 +549,7 @@ impl Unparser<'_> { } if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() + && user_alias.is_none() // Skip if user alias present — fall through to reconstruct_select_statement which preserves aliases && let LogicalPlan::Unnest(unnest) = &p.input.as_ref() && let Some(unnest_relation) = self.try_unnest_to_table_factor_sql(unnest)? @@ -1241,28 +1251,14 @@ impl Unparser<'_> { /// - If the column is not a placeholder column, return [None]. /// /// `outer_ref` is the display result of [Expr::OuterReferenceColumn] - /// When `deep_peel` is true, peels through multiple Alias layers to find - /// the inner Column. This is needed for Snowflake FLATTEN where user aliases - /// (e.g. `AS items`) add extra Alias wrappers around the placeholder column. - fn check_unnest_placeholder_with_outer_ref( - expr: &Expr, - deep_peel: bool, - ) -> Option { - let inner = match expr { - Expr::Alias(Alias { expr, .. }) => { - if deep_peel { - // Peel through all Alias layers - let mut e = expr.as_ref(); - while let Expr::Alias(Alias { expr: next, .. }) = e { - e = next.as_ref(); - } - e - } else { - expr.as_ref() - } - } - _ => return None, - }; + fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option { + // Peel through all Alias layers to find the inner Column. + // The expression may have multiple aliases, e.g.: + // Alias("items", Alias("UNNEST(...)", Column("__unnest_placeholder(...)"))) + let mut inner = expr; + while let Expr::Alias(Alias { expr, .. }) = inner { + inner = expr.as_ref(); + } if let Expr::Column(Column { name, .. }) = inner && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) { @@ -1274,6 +1270,19 @@ impl Unparser<'_> { None } + /// Extract the outermost user-provided alias from an unnest expression. + /// Returns `None` if the outermost alias is DataFusion's internal display + /// name (e.g. `UNNEST(make_array(...))`), or if there is no alias at all. + fn extract_unnest_user_alias(expr: &Expr) -> Option { + if let Expr::Alias(Alias { name, .. }) = expr { + // Internal aliases start with "UNNEST(" — user aliases don't. + if !name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")) { + return Some(name.clone()); + } + } + None + } + fn try_unnest_to_table_factor_sql( &self, unnest: &Unnest, @@ -1305,12 +1314,22 @@ impl Unparser<'_> { } /// Build a `SELECT alias."VALUE"` item for Snowflake FLATTEN output. - fn build_flatten_value_select_item(&self, alias_name: &str) -> ast::SelectItem { + fn build_flatten_value_select_item( + &self, + flatten_alias: &str, + user_alias: Option<&str>, + ) -> ast::SelectItem { let compound = ast::Expr::CompoundIdentifier(vec![ - self.new_ident_without_quote_style(alias_name.to_string()), + self.new_ident_without_quote_style(flatten_alias.to_string()), Ident::with_quote('"', "VALUE"), ]); - ast::SelectItem::UnnamedExpr(compound) + match user_alias { + Some(alias) => ast::SelectItem::ExprWithAlias { + expr: compound, + alias: self.new_ident_quoted_if_needs(alias.to_string()), + }, + None => ast::SelectItem::UnnamedExpr(compound), + } } /// Convert an `Unnest` logical plan node to a `LATERAL FLATTEN(INPUT => expr, ...)` diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index d0d872adf1be9..5e498d87c40ef 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3073,7 +3073,7 @@ fn snowflake_flatten_select_unnest_with_alias() -> Result<(), DataFusionError> { sql: "SELECT UNNEST([1,2,3]) as c1", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT _unnest."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, + expected: @r#"SELECT _unnest."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, ); Ok(()) } @@ -3180,7 +3180,7 @@ fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { let result = unparser.plan_to_sql(&plan)?; let actual = result.to_string(); - insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => json_get_array("j1"."j1_string")) AS _unnest LIMIT 5"#); + insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" AS "items" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => json_get_array("j1"."j1_string")) AS _unnest LIMIT 5"#); Ok(()) } From 909d8c6829c9469e49f8752967fb222fe5955055 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sun, 12 Apr 2026 21:23:01 -0400 Subject: [PATCH 09/14] More snowflake dialect fixes for unnest --- datafusion/sql/src/unparser/plan.rs | 165 ++++++++++++++++------ datafusion/sql/tests/cases/plan_to_sql.rs | 99 +++++++++++++ 2 files changed, 221 insertions(+), 43 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 5ebbac786c3a5..d2845650265d3 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -437,7 +437,7 @@ impl Unparser<'_> { }; if self.dialect.unnest_as_lateral_flatten() && unnest_input_type.is_some() - && let LogicalPlan::Unnest(unnest) = p.input.as_ref() + && let Some(unnest) = Self::peel_to_unnest(p.input.as_ref()) && let Some(flatten_relation) = self.try_unnest_to_lateral_flatten_sql(unnest)? { @@ -445,22 +445,23 @@ impl Unparser<'_> { // Check if the Unnest source is a real query (subselect) // vs an inline array (EmptyRelation). - let inner_projection = match unnest.input.as_ref() { - LogicalPlan::Projection(proj) => proj, - LogicalPlan::SubqueryAlias(alias) => match alias.input.as_ref() { - LogicalPlan::Projection(proj) => proj, - other => { - return internal_err!( - "Unnest input (through SubqueryAlias) is not a Projection: {other:?}" - ); - } - }, - other => { - return internal_err!( - "Unnest input is not a Projection: {other:?}" - ); - } - }; + let inner_projection = + Self::peel_to_inner_projection(unnest.input.as_ref()) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Unnest input is not a Projection: {:?}", + unnest.input + )) + })?; + + // Apply any intermediate Limit/Sort modifiers and get + // a reference to the Unnest LogicalPlan node for recursion. + // This bypasses the Limit/Sort handlers (which would create + // unwanted derived subqueries when already_projected is set). + let unnest_plan = self.apply_transparent_and_find_unnest_plan( + p.input.as_ref(), + query, + )?; if matches!( inner_projection.input.as_ref(), @@ -479,7 +480,7 @@ impl Unparser<'_> { } return self.select_to_sql_recursively( - p.input.as_ref(), + unnest_plan, query, select, relation, @@ -518,12 +519,7 @@ impl Unparser<'_> { // Recurse into the Unnest → inner source to set the primary // relation (table scan, subquery, etc.), then add FLATTEN // as a CROSS JOIN. - self.select_to_sql_recursively( - p.input.as_ref(), - query, - select, - relation, - )?; + self.select_to_sql_recursively(unnest_plan, query, select, relation)?; let flatten_factor = flatten.build().map_err(|e| { DataFusionError::Internal(format!("Failed to build FLATTEN: {e}")) @@ -550,14 +546,18 @@ impl Unparser<'_> { if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() && user_alias.is_none() // Skip if user alias present — fall through to reconstruct_select_statement which preserves aliases - && let LogicalPlan::Unnest(unnest) = &p.input.as_ref() + && let Some(unnest) = Self::peel_to_unnest(p.input.as_ref()) && let Some(unnest_relation) = self.try_unnest_to_table_factor_sql(unnest)? { relation.unnest(unnest_relation); - return self.select_to_sql_recursively( + let unnest_plan = self.apply_transparent_and_find_unnest_plan( p.input.as_ref(), query, + )?; + return self.select_to_sql_recursively( + unnest_plan, + query, select, relation, ); @@ -1201,13 +1201,9 @@ impl Unparser<'_> { // | Projection: table.col1, table.col2 AS UNNEST(table.col2) // | Filter: table.col3 = Int64(3) // | TableScan: table projection=None - if let LogicalPlan::Projection(p) = unnest.input.as_ref() { - // continue with projection input - self.select_to_sql_recursively(&p.input, query, select, relation) - } else if let LogicalPlan::SubqueryAlias(alias) = unnest.input.as_ref() - && let LogicalPlan::Projection(p) = alias.input.as_ref() - { - // SubqueryAlias wraps the Projection (e.g. passthrough tables) + if let Some(p) = Self::peel_to_inner_projection(unnest.input.as_ref()) { + // Skip the inner Projection (synthetic rewriter node) + // and continue with its input. self.select_to_sql_recursively(&p.input, query, select, relation) } else { internal_err!("Unnest input is not a Projection: {unnest:?}") @@ -1242,6 +1238,96 @@ impl Unparser<'_> { } } + /// Walk through "transparent" plan nodes (Limit, Sort) to find an Unnest. + /// + /// The DataFusion optimizer may insert Limit or Sort between the outer + /// Projection and the Unnest node. These nodes modify result quantity or + /// ordering but do not change the plan shape for unnest detection. The + /// normal recursion in [`Self::select_to_sql_recursively`] handles their + /// SQL rendering (LIMIT/OFFSET/ORDER BY); this helper only needs to + /// locate the Unnest so the FLATTEN / table-factor code path can fire. + fn peel_to_unnest(plan: &LogicalPlan) -> Option<&Unnest> { + match plan { + LogicalPlan::Unnest(unnest) => Some(unnest), + LogicalPlan::Limit(limit) => Self::peel_to_unnest(limit.input.as_ref()), + LogicalPlan::Sort(sort) => Self::peel_to_unnest(sort.input.as_ref()), + _ => None, + } + } + + /// Walk through "transparent" plan nodes (SubqueryAlias, Limit, Sort) to + /// find the inner Projection that feeds an Unnest node. + /// + /// The inner Projection is created by the `RecursiveUnnestRewriter` and + /// contains the array expression that the Unnest operates on. A + /// `SubqueryAlias` (e.g. from a virtual/passthrough table) may wrap the + /// Projection; Limit/Sort are also handled for robustness. + fn peel_to_inner_projection(plan: &LogicalPlan) -> Option<&Projection> { + match plan { + LogicalPlan::Projection(p) => Some(p), + LogicalPlan::SubqueryAlias(alias) => { + Self::peel_to_inner_projection(alias.input.as_ref()) + } + LogicalPlan::Limit(limit) => { + Self::peel_to_inner_projection(limit.input.as_ref()) + } + LogicalPlan::Sort(sort) => { + Self::peel_to_inner_projection(sort.input.as_ref()) + } + _ => None, + } + } + + /// Walk through transparent nodes (Limit, Sort) between the outer + /// Projection and the Unnest, applying their SQL modifiers (LIMIT, + /// OFFSET, ORDER BY) to the query builder. Returns a reference to the + /// Unnest `LogicalPlan` node so the caller can recurse into it directly, + /// bypassing the intermediate handlers that would otherwise create + /// unwanted derived subqueries. + fn apply_transparent_and_find_unnest_plan<'a>( + &self, + plan: &'a LogicalPlan, + query: &mut Option, + ) -> Result<&'a LogicalPlan> { + match plan { + LogicalPlan::Unnest(_) => Ok(plan), + LogicalPlan::Limit(limit) => { + if let Some(fetch) = &limit.fetch + && let Some(q) = query.as_mut() + { + q.limit(Some(self.expr_to_sql(fetch)?)); + } + if let Some(skip) = &limit.skip + && let Some(q) = query.as_mut() + { + q.offset(Some(ast::Offset { + rows: ast::OffsetRows::None, + value: self.expr_to_sql(skip)?, + })); + } + self.apply_transparent_and_find_unnest_plan(limit.input.as_ref(), query) + } + LogicalPlan::Sort(sort) => { + let Some(query_ref) = query.as_mut() else { + return internal_err!( + "Sort between Projection and Unnest requires a statement context." + ); + }; + if let Some(fetch) = sort.fetch { + query_ref.limit(Some(ast::Expr::value(ast::Value::Number( + fetch.to_string(), + false, + )))); + } + query_ref.order_by(self.sorts_to_sql(&sort.expr)?); + self.apply_transparent_and_find_unnest_plan(sort.input.as_ref(), query) + } + other => { + internal_err!("Unexpected node between Projection and Unnest: {other:?}") + } + } + } + /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`. /// /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`, @@ -1338,16 +1424,9 @@ impl Unparser<'_> { &self, unnest: &Unnest, ) -> Result> { - let projection = match unnest.input.as_ref() { - LogicalPlan::Projection(p) => p, - LogicalPlan::SubqueryAlias(alias) => { - if let LogicalPlan::Projection(p) = alias.input.as_ref() { - p - } else { - return Ok(None); - } - } - _ => return Ok(None), + let Some(projection) = Self::peel_to_inner_projection(unnest.input.as_ref()) + else { + return Ok(None); }; // For now, handle the simple case of a single expression to flatten. diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 5e498d87c40ef..c377d85504103 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3184,6 +3184,105 @@ fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { Ok(()) } +#[test] +fn snowflake_flatten_limit_between_projection_and_unnest() -> Result<(), DataFusionError> +{ + // Build: Projection → Limit → Unnest → Projection → TableScan + // The optimizer can insert a Limit between the outer Projection and the + // Unnest. The FLATTEN code path must look through transparent nodes + // (Limit, Sort) to find the Unnest. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .limit(0, Some(5))? // Limit BETWEEN outer Projection and Unnest + .project(vec![col("__unnest_placeholder(items)").alias("item")])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + // Must contain LATERAL FLATTEN — the Limit must not prevent FLATTEN detection + insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS _unnest LIMIT 5"#); + Ok(()) +} + +#[test] +fn snowflake_flatten_sort_between_projection_and_unnest() -> Result<(), DataFusionError> { + // Build: Projection → Sort → Unnest → Projection → TableScan + // Same as Limit test but with Sort instead. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .sort(vec![col("__unnest_placeholder(items)").sort(true, true)])? + .project(vec![col("__unnest_placeholder(items)").alias("item")])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + // Must contain LATERAL FLATTEN — the Sort must not prevent FLATTEN detection + assert!( + actual.contains("LATERAL FLATTEN"), + "Expected LATERAL FLATTEN in SQL, got: {actual}" + ); + assert!( + actual.contains("ORDER BY"), + "Expected ORDER BY in SQL, got: {actual}" + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_limit_between_projection_and_unnest_with_subquery_alias() +-> Result<(), DataFusionError> { + // Build: Projection → Limit → Unnest → SubqueryAlias → Projection → TableScan + // Combines the Limit and SubqueryAlias transparent node patterns. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .alias("t")? + .unnest_column("__unnest_placeholder(items)")? + .limit(0, Some(10))? + .project(vec![col("__unnest_placeholder(items)").alias("item")])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + assert!( + actual.contains("LATERAL FLATTEN"), + "Expected LATERAL FLATTEN in SQL, got: {actual}" + ); + assert!( + actual.contains("LIMIT 10"), + "Expected LIMIT 10 in SQL, got: {actual}" + ); + Ok(()) +} + #[test] fn snowflake_unnest_through_subquery_alias() -> Result<(), DataFusionError> { // Build: Projection → Unnest → SubqueryAlias → Projection → TableScan From 54d8a1d9d6a1e8a96257a7eceb08f9d6f9c84df8 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sun, 12 Apr 2026 21:58:12 -0400 Subject: [PATCH 10/14] More snowflake dialect fixes for unnes --- datafusion/sql/src/unparser/plan.rs | 89 +++++++++++++++++++---- datafusion/sql/tests/cases/plan_to_sql.rs | 77 ++++++++++++++++++++ 2 files changed, 150 insertions(+), 16 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index d2845650265d3..48d34176d58b9 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -45,7 +45,7 @@ use crate::utils::UNNEST_PLACEHOLDER; use datafusion_common::{ Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err, internal_err, not_impl_err, - tree_node::{TransformedResult, TreeNode}, + tree_node::{TransformedResult, TreeNode, TreeNodeRecursion}, }; use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX}; use datafusion_expr::{ @@ -420,17 +420,33 @@ impl Unparser<'_> { .select_to_sql_recursively(&new_plan, query, select, relation); } - // Projection can be top-level plan for unnest relation + // Projection can be top-level plan for unnest relation. // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have // only one expression, which is the placeholder column generated by the rewriter. - let unnest_input_type = if p.expr.len() == 1 { - Self::check_unnest_placeholder_with_outer_ref(&p.expr[0]) + // + // Two cases: + // - "bare": the expression IS the placeholder (+ aliases): + // `__unnest_placeholder(...) AS item` + // - "wrapped": the placeholder is inside a function call: + // `json_as_text(__unnest_placeholder(...), 'type') AS target_type` + let (unnest_input_type, placeholder_is_bare) = if p.expr.len() == 1 { + if let Some(t) = + Self::check_unnest_placeholder_with_outer_ref(&p.expr[0]) + { + (Some(t), true) + } else if let Some(t) = + Self::find_unnest_placeholder_in_expr(&p.expr[0]) + { + (Some(t), false) + } else { + (None, false) + } } else { - None + (None, false) }; // Extract the outermost user alias (e.g. "c1" from `UNNEST(...) AS c1`). // Internal aliases like "UNNEST(...)" are not user aliases. - let user_alias = if unnest_input_type.is_some() { + let user_alias = if placeholder_is_bare && unnest_input_type.is_some() { Self::extract_unnest_user_alias(&p.expr[0]) } else { None @@ -472,11 +488,15 @@ impl Unparser<'_> { relation.flatten(flatten_relation); if !select.already_projected() { - let value_expr = self.build_flatten_value_select_item( - &alias_name, - user_alias.as_deref(), - ); - select.projection(vec![value_expr]); + if placeholder_is_bare { + let value_expr = self.build_flatten_value_select_item( + &alias_name, + user_alias.as_deref(), + ); + select.projection(vec![value_expr]); + } else { + self.reconstruct_select_statement(plan, p, select)?; + } } return self.select_to_sql_recursively( @@ -509,11 +529,15 @@ impl Unparser<'_> { flatten.outer(unnest.options.preserve_nulls); if !select.already_projected() { - let value_expr = self.build_flatten_value_select_item( - &alias_name, - user_alias.as_deref(), - ); - select.projection(vec![value_expr]); + if placeholder_is_bare { + let value_expr = self.build_flatten_value_select_item( + &alias_name, + user_alias.as_deref(), + ); + select.projection(vec![value_expr]); + } else { + self.reconstruct_select_statement(plan, p, select)?; + } } // Recurse into the Unnest → inner source to set the primary @@ -1356,6 +1380,39 @@ impl Unparser<'_> { None } + /// Recursively search the expression tree for an unnest placeholder column. + /// + /// Unlike [`Self::check_unnest_placeholder_with_outer_ref`] which only + /// matches when the placeholder IS the expression (modulo aliases), this + /// function finds placeholders buried inside function calls or other + /// transformations. For example: + /// + /// ```text + /// Alias("target_type", + /// json_as_text( + /// Column("__unnest_placeholder(...)"), ← found here + /// Literal("type"))) + /// ``` + fn find_unnest_placeholder_in_expr(expr: &Expr) -> Option { + let mut result = None; + let _ = expr.apply(|e| { + if let Expr::Column(Column { name, .. }) = e + && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) + { + result = if prefix + .starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) + { + Some(UnnestInputType::OuterReference) + } else { + Some(UnnestInputType::Scalar) + }; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + }); + result + } + /// Extract the outermost user-provided alias from an unnest expression. /// Returns `None` if the outermost alias is DataFusion's internal display /// name (e.g. `UNNEST(make_array(...))`), or if there is no alias at all. diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index c377d85504103..0e4ead95509a8 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3283,6 +3283,83 @@ fn snowflake_flatten_limit_between_projection_and_unnest_with_subquery_alias() Ok(()) } +#[test] +fn snowflake_flatten_composed_expression_wrapping_unnest() -> Result<(), DataFusionError> +{ + // Build: Projection(CAST(placeholder AS Int64) AS item_id) → Unnest → Projection → TableScan + // The outer Projection wraps the unnest output in a function call. + // The FLATTEN code path must detect the placeholder inside the function + // and still emit LATERAL FLATTEN. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .project(vec![ + cast(col("__unnest_placeholder(items)"), DataType::Int64).alias("item_id"), + ])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + // Must contain LATERAL FLATTEN despite the placeholder being inside CAST + assert!( + actual.contains("LATERAL FLATTEN"), + "Expected LATERAL FLATTEN in SQL, got: {actual}" + ); + assert!( + actual.contains("CAST"), + "Expected CAST in SQL, got: {actual}" + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_composed_expression_with_limit() -> Result<(), DataFusionError> { + // Combines both bugs: composed expression + Limit between Projection and Unnest + // Build: Projection(CAST(placeholder AS Int64) AS item_id) → Limit → Unnest → Projection → TableScan + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .limit(0, Some(5))? + .project(vec![ + cast(col("__unnest_placeholder(items)"), DataType::Int64).alias("item_id"), + ])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + assert!( + actual.contains("LATERAL FLATTEN"), + "Expected LATERAL FLATTEN in SQL, got: {actual}" + ); + assert!( + actual.contains("CAST"), + "Expected CAST in SQL, got: {actual}" + ); + assert!( + actual.contains("LIMIT 5"), + "Expected LIMIT 5 in SQL, got: {actual}" + ); + Ok(()) +} + #[test] fn snowflake_unnest_through_subquery_alias() -> Result<(), DataFusionError> { // Build: Projection → Unnest → SubqueryAlias → Projection → TableScan From c083c6c52b01f46334c8005c7298d5c929f9863d Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sun, 12 Apr 2026 23:25:02 -0400 Subject: [PATCH 11/14] Quoting fix --- datafusion/sql/src/unparser/ast.rs | 2 +- datafusion/sql/src/unparser/plan.rs | 267 ++++++++-------------- datafusion/sql/tests/cases/plan_to_sql.rs | 18 +- 3 files changed, 103 insertions(+), 184 deletions(-) diff --git a/datafusion/sql/src/unparser/ast.rs b/datafusion/sql/src/unparser/ast.rs index b49313f9179e5..9b3fd4151d946 100644 --- a/datafusion/sql/src/unparser/ast.rs +++ b/datafusion/sql/src/unparser/ast.rs @@ -769,7 +769,7 @@ impl FlattenRelationBuilder { fn create_empty() -> Self { Self { alias: Some(ast::TableAlias { - name: ast::Ident::new(FLATTEN_DEFAULT_ALIAS), + name: ast::Ident::with_quote('"', FLATTEN_DEFAULT_ALIAS), columns: vec![], explicit: true, }), diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 48d34176d58b9..239b2b5a8b70b 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -421,25 +421,16 @@ impl Unparser<'_> { } // Projection can be top-level plan for unnest relation. - // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have - // only one expression, which is the placeholder column generated by the rewriter. - // - // Two cases: - // - "bare": the expression IS the placeholder (+ aliases): - // `__unnest_placeholder(...) AS item` - // - "wrapped": the placeholder is inside a function call: - // `json_as_text(__unnest_placeholder(...), 'type') AS target_type` + // The projection generated by the `RecursiveUnnestRewriter` + // will have exactly one expression referencing the unnest + // placeholder column. The placeholder may be at the top level + // ("bare": `__unnest_placeholder(...) AS item`) or wrapped + // inside a function call ("wrapped": + // `json_as_text(__unnest_placeholder(...), 'type') AS t`). let (unnest_input_type, placeholder_is_bare) = if p.expr.len() == 1 { - if let Some(t) = - Self::check_unnest_placeholder_with_outer_ref(&p.expr[0]) - { - (Some(t), true) - } else if let Some(t) = - Self::find_unnest_placeholder_in_expr(&p.expr[0]) - { - (Some(t), false) - } else { - (None, false) + match Self::find_unnest_placeholder(&p.expr[0]) { + Some((t, is_bare)) => (Some(t), is_bare), + None => (None, false), } } else { (None, false) @@ -451,16 +442,19 @@ impl Unparser<'_> { } else { None }; + // Snowflake LATERAL FLATTEN path. + // `peel_to_unnest_with_modifiers` walks through any + // intermediate Limit/Sort nodes, applies their modifiers to + // the query, and returns the Unnest + the LogicalPlan ref to + // recurse into (bypassing the normal Limit/Sort handlers that + // would create a derived subquery). if self.dialect.unnest_as_lateral_flatten() && unnest_input_type.is_some() - && let Some(unnest) = Self::peel_to_unnest(p.input.as_ref()) - && let Some(flatten_relation) = + && let Some((unnest, unnest_plan)) = + self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? + && let Some(flatten) = self.try_unnest_to_lateral_flatten_sql(unnest)? { - let alias_name = flatten_relation.alias_name().to_string(); - - // Check if the Unnest source is a real query (subselect) - // vs an inline array (EmptyRelation). let inner_projection = Self::peel_to_inner_projection(unnest.input.as_ref()) .ok_or_else(|| { @@ -470,14 +464,22 @@ impl Unparser<'_> { )) })?; - // Apply any intermediate Limit/Sort modifiers and get - // a reference to the Unnest LogicalPlan node for recursion. - // This bypasses the Limit/Sort handlers (which would create - // unwanted derived subqueries when already_projected is set). - let unnest_plan = self.apply_transparent_and_find_unnest_plan( - p.input.as_ref(), - query, - )?; + // Set the SELECT projection. + if !select.already_projected() { + if placeholder_is_bare { + let value_expr = self.build_flatten_value_select_item( + flatten.alias_name(), + user_alias.as_deref(), + ); + select.projection(vec![value_expr]); + } else { + // Composed expression wrapping the placeholder: + // reconstruct_select_statement rewrites the + // placeholder column to `_unnest."VALUE"` via + // unproject_unnest_expr_as_flatten_value. + self.reconstruct_select_statement(plan, p, select)?; + } + } if matches!( inner_projection.input.as_ref(), @@ -485,20 +487,7 @@ impl Unparser<'_> { ) { // Inline array case (e.g. UNNEST([1,2,3])): // FLATTEN is the sole FROM source. - relation.flatten(flatten_relation); - - if !select.already_projected() { - if placeholder_is_bare { - let value_expr = self.build_flatten_value_select_item( - &alias_name, - user_alias.as_deref(), - ); - select.projection(vec![value_expr]); - } else { - self.reconstruct_select_statement(plan, p, select)?; - } - } - + relation.flatten(flatten); return self.select_to_sql_recursively( unnest_plan, query, @@ -508,41 +497,9 @@ impl Unparser<'_> { } // Non-empty source (table, subquery, etc.): - // The FLATTEN references columns/expressions from the source. - // Convert the inner Projection's expression to SQL for the - // FLATTEN INPUT, then store the FLATTEN to be added as a - // CROSS JOIN after the source relation is processed. - let first_expr = inner_projection.expr.first().ok_or_else(|| { - DataFusionError::Internal( - "Unnest inner projection has no expressions".to_string(), - ) - })?; - // Strip the __unnest_placeholder alias to get the raw expression - let raw_expr = match first_expr { - Expr::Alias(Alias { expr, .. }) => expr.as_ref(), - other => other, - }; - let input_sql = self.expr_to_sql(raw_expr)?; - - let mut flatten = FlattenRelationBuilder::default(); - flatten.input_expr(input_sql); - flatten.outer(unnest.options.preserve_nulls); - - if !select.already_projected() { - if placeholder_is_bare { - let value_expr = self.build_flatten_value_select_item( - &alias_name, - user_alias.as_deref(), - ); - select.projection(vec![value_expr]); - } else { - self.reconstruct_select_statement(plan, p, select)?; - } - } - - // Recurse into the Unnest → inner source to set the primary - // relation (table scan, subquery, etc.), then add FLATTEN - // as a CROSS JOIN. + // Recurse into the Unnest → inner source to set the + // primary FROM relation, then add the FLATTEN as a + // CROSS JOIN. self.select_to_sql_recursively(unnest_plan, query, select, relation)?; let flatten_factor = flatten.build().map_err(|e| { @@ -559,7 +516,6 @@ impl Unparser<'_> { from.push_join(cross_join); select.push_from(from); } else { - // No existing FROM — create one with just the FLATTEN let mut twj = TableWithJoinsBuilder::default(); twj.push_join(cross_join); select.push_from(twj); @@ -567,18 +523,16 @@ impl Unparser<'_> { return Ok(()); } + // Standard UNNEST table factor path (BigQuery, etc.). if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() && user_alias.is_none() // Skip if user alias present — fall through to reconstruct_select_statement which preserves aliases - && let Some(unnest) = Self::peel_to_unnest(p.input.as_ref()) + && let Some((unnest, unnest_plan)) = + self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? && let Some(unnest_relation) = self.try_unnest_to_table_factor_sql(unnest)? { relation.unnest(unnest_relation); - let unnest_plan = self.apply_transparent_and_find_unnest_plan( - p.input.as_ref(), - query, - )?; return self.select_to_sql_recursively( unnest_plan, query, @@ -1262,59 +1216,40 @@ impl Unparser<'_> { } } - /// Walk through "transparent" plan nodes (Limit, Sort) to find an Unnest. - /// - /// The DataFusion optimizer may insert Limit or Sort between the outer - /// Projection and the Unnest node. These nodes modify result quantity or - /// ordering but do not change the plan shape for unnest detection. The - /// normal recursion in [`Self::select_to_sql_recursively`] handles their - /// SQL rendering (LIMIT/OFFSET/ORDER BY); this helper only needs to - /// locate the Unnest so the FLATTEN / table-factor code path can fire. - fn peel_to_unnest(plan: &LogicalPlan) -> Option<&Unnest> { - match plan { - LogicalPlan::Unnest(unnest) => Some(unnest), - LogicalPlan::Limit(limit) => Self::peel_to_unnest(limit.input.as_ref()), - LogicalPlan::Sort(sort) => Self::peel_to_unnest(sort.input.as_ref()), - _ => None, - } - } - - /// Walk through "transparent" plan nodes (SubqueryAlias, Limit, Sort) to - /// find the inner Projection that feeds an Unnest node. + /// Walk through transparent nodes (SubqueryAlias) to find the inner + /// Projection that feeds an Unnest node. /// - /// The inner Projection is created by the `RecursiveUnnestRewriter` and - /// contains the array expression that the Unnest operates on. A - /// `SubqueryAlias` (e.g. from a virtual/passthrough table) may wrap the - /// Projection; Limit/Sort are also handled for robustness. + /// The inner Projection is created atomically by the + /// `RecursiveUnnestRewriter` and contains the array expression that the + /// Unnest operates on. A `SubqueryAlias` (e.g. from a virtual/passthrough + /// table) may wrap the Projection. fn peel_to_inner_projection(plan: &LogicalPlan) -> Option<&Projection> { match plan { LogicalPlan::Projection(p) => Some(p), LogicalPlan::SubqueryAlias(alias) => { Self::peel_to_inner_projection(alias.input.as_ref()) } - LogicalPlan::Limit(limit) => { - Self::peel_to_inner_projection(limit.input.as_ref()) - } - LogicalPlan::Sort(sort) => { - Self::peel_to_inner_projection(sort.input.as_ref()) - } _ => None, } } /// Walk through transparent nodes (Limit, Sort) between the outer /// Projection and the Unnest, applying their SQL modifiers (LIMIT, - /// OFFSET, ORDER BY) to the query builder. Returns a reference to the - /// Unnest `LogicalPlan` node so the caller can recurse into it directly, - /// bypassing the intermediate handlers that would otherwise create - /// unwanted derived subqueries. - fn apply_transparent_and_find_unnest_plan<'a>( + /// OFFSET, ORDER BY) to the query builder. Returns the `Unnest` node + /// and a reference to the enclosing `LogicalPlan` for recursion, or + /// `Ok(None)` if no Unnest is found. + /// + /// By processing Limit/Sort inline and then recursing into the Unnest + /// plan directly, we bypass the normal Limit/Sort handlers which would + /// create unwanted derived subqueries (since `already_projected` is + /// set at the point this is called). + fn peel_to_unnest_with_modifiers<'a>( &self, plan: &'a LogicalPlan, query: &mut Option, - ) -> Result<&'a LogicalPlan> { + ) -> Result> { match plan { - LogicalPlan::Unnest(_) => Ok(plan), + LogicalPlan::Unnest(unnest) => Ok(Some((unnest, plan))), LogicalPlan::Limit(limit) => { if let Some(fetch) = &limit.fetch && let Some(q) = query.as_mut() @@ -1329,7 +1264,7 @@ impl Unparser<'_> { value: self.expr_to_sql(skip)?, })); } - self.apply_transparent_and_find_unnest_plan(limit.input.as_ref(), query) + self.peel_to_unnest_with_modifiers(limit.input.as_ref(), query) } LogicalPlan::Sort(sort) => { let Some(query_ref) = query.as_mut() else { @@ -1344,32 +1279,49 @@ impl Unparser<'_> { )))); } query_ref.order_by(self.sorts_to_sql(&sort.expr)?); - self.apply_transparent_and_find_unnest_plan(sort.input.as_ref(), query) - } - other => { - internal_err!("Unexpected node between Projection and Unnest: {other:?}") + self.peel_to_unnest_with_modifiers(sort.input.as_ref(), query) } + _ => Ok(None), } } - /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`. + /// Search an expression for an unnest placeholder column reference. /// - /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`, - /// it means it is a scalar column, return [UnnestInputType::Scalar]. - /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`, - /// it means it is an outer reference column, return [UnnestInputType::OuterReference]. - /// - If the column is not a placeholder column, return [None]. + /// Returns both the [`UnnestInputType`] and whether the placeholder is + /// "bare" (the expression IS the placeholder, modulo aliases) or + /// "wrapped" (the placeholder is inside a function call or other + /// transformation). /// - /// `outer_ref` is the display result of [Expr::OuterReferenceColumn] - fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option { - // Peel through all Alias layers to find the inner Column. - // The expression may have multiple aliases, e.g.: - // Alias("items", Alias("UNNEST(...)", Column("__unnest_placeholder(...)"))) + /// Examples: + /// - `Alias("item", Column("__unnest_placeholder(...)"))` → `Some((Scalar, true))` + /// - `Alias("t", Cast(Column("__unnest_placeholder(...)"), Int64))` → `Some((Scalar, false))` + /// - `Column("unrelated")` → `None` + fn find_unnest_placeholder(expr: &Expr) -> Option<(UnnestInputType, bool)> { + // Fast path: check if the expression IS the placeholder (peel aliases). let mut inner = expr; while let Expr::Alias(Alias { expr, .. }) = inner { inner = expr.as_ref(); } - if let Expr::Column(Column { name, .. }) = inner + if let Some(t) = Self::classify_placeholder_column(inner) { + return Some((t, true)); + } + // Slow path: walk the full expression tree. + let mut result = None; + let _ = expr.apply(|e| { + if let Some(t) = Self::classify_placeholder_column(e) { + result = Some(t); + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + }); + result.map(|t| (t, false)) + } + + /// If `expr` is a `Column` whose name starts with `__unnest_placeholder`, + /// classify it as [`UnnestInputType::OuterReference`] or + /// [`UnnestInputType::Scalar`]. + fn classify_placeholder_column(expr: &Expr) -> Option { + if let Expr::Column(Column { name, .. }) = expr && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) { if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) { @@ -1380,39 +1332,6 @@ impl Unparser<'_> { None } - /// Recursively search the expression tree for an unnest placeholder column. - /// - /// Unlike [`Self::check_unnest_placeholder_with_outer_ref`] which only - /// matches when the placeholder IS the expression (modulo aliases), this - /// function finds placeholders buried inside function calls or other - /// transformations. For example: - /// - /// ```text - /// Alias("target_type", - /// json_as_text( - /// Column("__unnest_placeholder(...)"), ← found here - /// Literal("type"))) - /// ``` - fn find_unnest_placeholder_in_expr(expr: &Expr) -> Option { - let mut result = None; - let _ = expr.apply(|e| { - if let Expr::Column(Column { name, .. }) = e - && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) - { - result = if prefix - .starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) - { - Some(UnnestInputType::OuterReference) - } else { - Some(UnnestInputType::Scalar) - }; - return Ok(TreeNodeRecursion::Stop); - } - Ok(TreeNodeRecursion::Continue) - }); - result - } - /// Extract the outermost user-provided alias from an unnest expression. /// Returns `None` if the outermost alias is DataFusion's internal display /// name (e.g. `UNNEST(make_array(...))`), or if there is no alias at all. @@ -1463,7 +1382,7 @@ impl Unparser<'_> { user_alias: Option<&str>, ) -> ast::SelectItem { let compound = ast::Expr::CompoundIdentifier(vec![ - self.new_ident_without_quote_style(flatten_alias.to_string()), + self.new_ident_quoted_if_needs(flatten_alias.to_string()), Ident::with_quote('"', "VALUE"), ]); match user_alias { diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 0e4ead95509a8..1f9136b143826 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3004,7 +3004,7 @@ fn snowflake_unnest_to_lateral_flatten_simple() -> Result<(), DataFusionError> { sql: "SELECT * FROM UNNEST([1,2,3])", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT _unnest."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, + expected: @r#"SELECT "_unnest"."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest""#, ); Ok(()) } @@ -3016,7 +3016,7 @@ fn snowflake_unnest_to_lateral_flatten_with_cross_join() -> Result<(), DataFusio sql: "SELECT * FROM UNNEST([1,2,3]), j1", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT _unnest."VALUE", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest CROSS JOIN "j1""#, + expected: @r#"SELECT "_unnest"."VALUE", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest" CROSS JOIN "j1""#, ); Ok(()) } @@ -3048,7 +3048,7 @@ fn snowflake_flatten_implicit_from() -> Result<(), DataFusionError> { sql: "SELECT UNNEST([1,2,3])", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @"SELECT _unnest.\"VALUE\" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest", + expected: @r#"SELECT "_unnest"."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest""#, ); Ok(()) } @@ -3061,7 +3061,7 @@ fn snowflake_flatten_string_array() -> Result<(), DataFusionError> { sql: "SELECT * FROM UNNEST(['a','b','c'])", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @"SELECT _unnest.\"VALUE\" FROM LATERAL FLATTEN(INPUT => ['a', 'b', 'c']) AS _unnest", + expected: @r#"SELECT "_unnest"."VALUE" FROM LATERAL FLATTEN(INPUT => ['a', 'b', 'c']) AS "_unnest""#, ); Ok(()) } @@ -3073,7 +3073,7 @@ fn snowflake_flatten_select_unnest_with_alias() -> Result<(), DataFusionError> { sql: "SELECT UNNEST([1,2,3]) as c1", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT _unnest."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, + expected: @r#"SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest""#, ); Ok(()) } @@ -3085,7 +3085,7 @@ fn snowflake_flatten_select_unnest_plus_literal() -> Result<(), DataFusionError> sql: "SELECT UNNEST([1,2,3]), 1", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT _unnest."VALUE", "Int64(1)" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#, + expected: @r#"SELECT "_unnest"."VALUE", "Int64(1)" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest""#, ); Ok(()) } @@ -3113,7 +3113,7 @@ fn snowflake_flatten_unnest_from_subselect() -> Result<(), DataFusionError> { sql: "SELECT UNNEST(array_col) FROM (SELECT array_col FROM unnest_table WHERE array_col IS NOT NULL LIMIT 3)", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT _unnest."VALUE" FROM (SELECT "unnest_table"."array_col" FROM "unnest_table" WHERE "unnest_table"."array_col" IS NOT NULL LIMIT 3) CROSS JOIN LATERAL FLATTEN(INPUT => "unnest_table"."array_col") AS _unnest"#, + expected: @r#"SELECT "_unnest"."VALUE" FROM (SELECT "unnest_table"."array_col" FROM "unnest_table" WHERE "unnest_table"."array_col" IS NOT NULL LIMIT 3) CROSS JOIN LATERAL FLATTEN(INPUT => "unnest_table"."array_col") AS "_unnest""#, ); Ok(()) } @@ -3180,7 +3180,7 @@ fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { let result = unparser.plan_to_sql(&plan)?; let actual = result.to_string(); - insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" AS "items" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => json_get_array("j1"."j1_string")) AS _unnest LIMIT 5"#); + insta::assert_snapshot!(actual, @r#"SELECT "_unnest"."VALUE" AS "items" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => json_get_array("j1"."j1_string")) AS "_unnest" LIMIT 5"#); Ok(()) } @@ -3210,7 +3210,7 @@ fn snowflake_flatten_limit_between_projection_and_unnest() -> Result<(), DataFus let actual = result.to_string(); // Must contain LATERAL FLATTEN — the Limit must not prevent FLATTEN detection - insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS _unnest LIMIT 5"#); + insta::assert_snapshot!(actual, @r#"SELECT "_unnest"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest" LIMIT 5"#); Ok(()) } From 455fc83ec8deb265f01c0b63a6c0e6108f1500fe Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Mon, 13 Apr 2026 08:20:39 -0400 Subject: [PATCH 12/14] Unnest fixes for multi-arguments --- datafusion/sql/src/unparser/plan.rs | 36 ++++++---- datafusion/sql/tests/cases/plan_to_sql.rs | 81 +++++++++++++++++++++++ 2 files changed, 103 insertions(+), 14 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 239b2b5a8b70b..313a7e26d5d22 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -421,20 +421,25 @@ impl Unparser<'_> { } // Projection can be top-level plan for unnest relation. - // The projection generated by the `RecursiveUnnestRewriter` - // will have exactly one expression referencing the unnest - // placeholder column. The placeholder may be at the top level - // ("bare": `__unnest_placeholder(...) AS item`) or wrapped - // inside a function call ("wrapped": - // `json_as_text(__unnest_placeholder(...), 'type') AS t`). - let (unnest_input_type, placeholder_is_bare) = if p.expr.len() == 1 { - match Self::find_unnest_placeholder(&p.expr[0]) { - Some((t, is_bare)) => (Some(t), is_bare), - None => (None, false), - } - } else { - (None, false) - }; + // At least one expression will reference the unnest + // placeholder column. The placeholder may be: + // - "bare": the sole expression IS the placeholder (+ aliases) + // - "wrapped": inside a function call, or one of several exprs + // When bare, we emit `_unnest."VALUE" [AS alias]`. + // Otherwise `reconstruct_select_statement` renders all + // expressions and rewrites the placeholder via + // `unproject_unnest_expr_as_flatten_value`. + let (unnest_input_type, placeholder_is_bare) = p + .expr + .iter() + .find_map(Self::find_unnest_placeholder) + .map(|(t, is_bare)| { + // Only bare when there is a single expression that + // IS the placeholder — multi-expression projections + // always need reconstruct_select_statement. + (Some(t), is_bare && p.expr.len() == 1) + }) + .unwrap_or((None, false)); // Extract the outermost user alias (e.g. "c1" from `UNNEST(...) AS c1`). // Internal aliases like "UNNEST(...)" are not user aliases. let user_alias = if placeholder_is_bare && unnest_input_type.is_some() { @@ -524,8 +529,11 @@ impl Unparser<'_> { return Ok(()); } // Standard UNNEST table factor path (BigQuery, etc.). + // Only fires for single-expression projections — multi-expr + // falls through to reconstruct_select_statement. if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() + && p.expr.len() == 1 && user_alias.is_none() // Skip if user alias present — fall through to reconstruct_select_statement which preserves aliases && let Some((unnest, unnest_plan)) = self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 1f9136b143826..bff9696e64bee 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3360,6 +3360,87 @@ fn snowflake_flatten_composed_expression_with_limit() -> Result<(), DataFusionEr Ok(()) } +#[test] +fn snowflake_flatten_multi_expression_projection() -> Result<(), DataFusionError> { + // Build: Projection([CAST(placeholder AS Int64) AS a, CAST(placeholder AS Utf8) AS b]) + // → Unnest → Projection → TableScan + // The outer Projection has TWO expressions — both reference the placeholder. + // The FLATTEN code path must fire even when p.expr.len() > 1. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .project(vec![ + cast(col("__unnest_placeholder(items)"), DataType::Int64).alias("a"), + cast(col("__unnest_placeholder(items)"), DataType::Utf8).alias("b"), + ])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + assert!( + actual.contains("LATERAL FLATTEN"), + "Expected LATERAL FLATTEN in SQL, got: {actual}" + ); + // Both expressions should be present + assert!( + actual.contains("CAST"), + "Expected CAST in SQL, got: {actual}" + ); + assert!( + actual.contains(r#"AS "a""#), + "Expected alias 'a' in SQL, got: {actual}" + ); + assert!( + actual.contains(r#"AS "b""#), + "Expected alias 'b' in SQL, got: {actual}" + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_multi_expression_with_limit() -> Result<(), DataFusionError> { + // Multi-expression + Limit between Projection and Unnest + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .limit(0, Some(10))? + .project(vec![ + cast(col("__unnest_placeholder(items)"), DataType::Int64).alias("a"), + cast(col("__unnest_placeholder(items)"), DataType::Utf8).alias("b"), + ])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + assert!( + actual.contains("LATERAL FLATTEN"), + "Expected LATERAL FLATTEN in SQL, got: {actual}" + ); + assert!( + actual.contains("LIMIT 10"), + "Expected LIMIT 10 in SQL, got: {actual}" + ); + Ok(()) +} + #[test] fn snowflake_unnest_through_subquery_alias() -> Result<(), DataFusionError> { // Build: Projection → Unnest → SubqueryAlias → Projection → TableScan From 5f1d805b4e7400b8ad1aa3dab770bc93431fee1d Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Mon, 13 Apr 2026 13:06:48 -0400 Subject: [PATCH 13/14] Few more fixes --- datafusion/sql/src/unparser/plan.rs | 240 ++++++++++------------ datafusion/sql/tests/cases/plan_to_sql.rs | 22 +- 2 files changed, 116 insertions(+), 146 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 313a7e26d5d22..29e8a4a627b49 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -298,29 +298,15 @@ impl Unparser<'_> { let items = exprs .iter() .map(|e| { - if use_flatten { - // For Snowflake FLATTEN: rewrite UNNEST output columns - // to `_unnest."VALUE"`. After unproject_unnest_expr_as_flatten_value - // runs, the expression is either: - // - bare Column with UNNEST prefix (outer ref case) - // - Alias { name: "UNNEST(...)", expr: Column(_unnest.VALUE) } - // (the inner column is already rewritten but the alias preserves - // the internal UNNEST display name) - let is_unnest_col = match e { - Expr::Column(col) => col - .name - .starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")), - Expr::Alias(Alias { name, .. }) => { - name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")) - } - _ => false, - }; - if is_unnest_col { - return Ok(self.build_flatten_value_select_item( - FLATTEN_DEFAULT_ALIAS, - None, - )); - } + // After unproject_unnest_expr_as_flatten_value, an + // internal UNNEST display-name alias may still wrap + // the rewritten _unnest.VALUE column. Replace it + // with the bare FLATTEN VALUE select item. + if use_flatten && Self::has_internal_unnest_alias(e) { + return Ok(self.build_flatten_value_select_item( + FLATTEN_DEFAULT_ALIAS, + None, + )); } self.select_item_to_sql(e) }) @@ -421,38 +407,47 @@ impl Unparser<'_> { } // Projection can be top-level plan for unnest relation. - // At least one expression will reference the unnest - // placeholder column. The placeholder may be: - // - "bare": the sole expression IS the placeholder (+ aliases) - // - "wrapped": inside a function call, or one of several exprs - // When bare, we emit `_unnest."VALUE" [AS alias]`. - // Otherwise `reconstruct_select_statement` renders all - // expressions and rewrites the placeholder via - // `unproject_unnest_expr_as_flatten_value`. - let (unnest_input_type, placeholder_is_bare) = p - .expr - .iter() - .find_map(Self::find_unnest_placeholder) - .map(|(t, is_bare)| { - // Only bare when there is a single expression that - // IS the placeholder — multi-expression projections - // always need reconstruct_select_statement. - (Some(t), is_bare && p.expr.len() == 1) - }) - .unwrap_or((None, false)); - // Extract the outermost user alias (e.g. "c1" from `UNNEST(...) AS c1`). - // Internal aliases like "UNNEST(...)" are not user aliases. - let user_alias = if placeholder_is_bare && unnest_input_type.is_some() { - Self::extract_unnest_user_alias(&p.expr[0]) - } else { - None - }; - // Snowflake LATERAL FLATTEN path. + // The projection generated by the `RecursiveUnnestRewriter` + // will have at least one expression referencing an unnest + // placeholder column. + let unnest_input_type: Option = + p.expr.iter().find_map(Self::find_unnest_placeholder); + + // --- UNNEST table factor path (BigQuery, etc.) --- + // Only fires for a single bare-placeholder projection. + // Uses peel_to_unnest_with_modifiers (rather than matching + // p.input directly) to handle Limit/Sort between Projection + // and Unnest. + if self.dialect.unnest_as_table_factor() + && p.expr.len() == 1 + && Self::is_bare_unnest_placeholder(&p.expr[0]) + && let Some((unnest, unnest_plan)) = + self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? + && let Some(unnest_relation) = + self.try_unnest_to_table_factor_sql(unnest)? + { + relation.unnest(unnest_relation); + return self.select_to_sql_recursively( + unnest_plan, + query, + select, + relation, + ); + } + + // --- Snowflake LATERAL FLATTEN path --- // `peel_to_unnest_with_modifiers` walks through any - // intermediate Limit/Sort nodes, applies their modifiers to - // the query, and returns the Unnest + the LogicalPlan ref to - // recurse into (bypassing the normal Limit/Sort handlers that - // would create a derived subquery). + // intermediate Limit/Sort nodes (the optimizer can insert + // these between the Projection and the Unnest), applies + // their modifiers to the query, and returns the Unnest + + // the LogicalPlan ref to recurse into. This bypasses the + // normal Limit/Sort handlers which would wrap the subtree + // in a derived subquery. + // SELECT rendering is delegated to + // `reconstruct_select_statement`, which rewrites + // placeholder columns to `"_unnest"."VALUE"` via + // `unproject_unnest_expr_as_flatten_value` — this works + // for bare, wrapped, and multi-expression projections. if self.dialect.unnest_as_lateral_flatten() && unnest_input_type.is_some() && let Some((unnest, unnest_plan)) = @@ -469,28 +464,17 @@ impl Unparser<'_> { )) })?; - // Set the SELECT projection. + // An outer plan (e.g. a wrapping Projection) may have + // already set SELECT columns; only set them once. if !select.already_projected() { - if placeholder_is_bare { - let value_expr = self.build_flatten_value_select_item( - flatten.alias_name(), - user_alias.as_deref(), - ); - select.projection(vec![value_expr]); - } else { - // Composed expression wrapping the placeholder: - // reconstruct_select_statement rewrites the - // placeholder column to `_unnest."VALUE"` via - // unproject_unnest_expr_as_flatten_value. - self.reconstruct_select_statement(plan, p, select)?; - } + self.reconstruct_select_statement(plan, p, select)?; } if matches!( inner_projection.input.as_ref(), LogicalPlan::EmptyRelation(_) ) { - // Inline array case (e.g. UNNEST([1,2,3])): + // Inline array (e.g. UNNEST([1,2,3])): // FLATTEN is the sole FROM source. relation.flatten(flatten); return self.select_to_sql_recursively( @@ -502,9 +486,8 @@ impl Unparser<'_> { } // Non-empty source (table, subquery, etc.): - // Recurse into the Unnest → inner source to set the - // primary FROM relation, then add the FLATTEN as a - // CROSS JOIN. + // recurse to set the primary FROM, then attach FLATTEN + // as a CROSS JOIN. self.select_to_sql_recursively(unnest_plan, query, select, relation)?; let flatten_factor = flatten.build().map_err(|e| { @@ -528,26 +511,6 @@ impl Unparser<'_> { return Ok(()); } - // Standard UNNEST table factor path (BigQuery, etc.). - // Only fires for single-expression projections — multi-expr - // falls through to reconstruct_select_statement. - if self.dialect.unnest_as_table_factor() - && unnest_input_type.is_some() - && p.expr.len() == 1 - && user_alias.is_none() // Skip if user alias present — fall through to reconstruct_select_statement which preserves aliases - && let Some((unnest, unnest_plan)) = - self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? - && let Some(unnest_relation) = - self.try_unnest_to_table_factor_sql(unnest)? - { - relation.unnest(unnest_relation); - return self.select_to_sql_recursively( - unnest_plan, - query, - select, - relation, - ); - } // If it's a unnest projection, we should provide the table column alias // to provide a column name for the unnest relation. @@ -1151,24 +1114,22 @@ impl Unparser<'_> { } } LogicalPlan::Unnest(unnest) => { - if self.dialect.unnest_as_lateral_flatten() - && !unnest.struct_type_columns.is_empty() - { - return not_impl_err!( - "Snowflake FLATTEN cannot unparse struct unnest: \ - DataFusion expands struct fields into columns (horizontal), \ - but Snowflake FLATTEN expands them into rows (vertical). \ - Columns: {:?}", + if !unnest.struct_type_columns.is_empty() { + if self.dialect.unnest_as_lateral_flatten() { + return not_impl_err!( + "Snowflake FLATTEN cannot unparse struct unnest: \ + DataFusion expands struct fields into columns (horizontal), \ + but Snowflake FLATTEN expands them into rows (vertical). \ + Columns: {:?}", + unnest.struct_type_columns + ); + } + return internal_err!( + "Struct type columns are not currently supported in UNNEST: {:?}", unnest.struct_type_columns ); } - assert_or_internal_err!( - unnest.struct_type_columns.is_empty(), - "Struct type columns are not currently supported in UNNEST: {:?}", - unnest.struct_type_columns - ); - // For Snowflake FLATTEN: if the relation hasn't been set yet // (UNNEST was in SELECT clause, not FROM clause), set the FLATTEN // relation here so the FROM clause is emitted. @@ -1293,27 +1254,13 @@ impl Unparser<'_> { } } - /// Search an expression for an unnest placeholder column reference. + /// Search an expression tree for an unnest placeholder column reference. /// - /// Returns both the [`UnnestInputType`] and whether the placeholder is - /// "bare" (the expression IS the placeholder, modulo aliases) or - /// "wrapped" (the placeholder is inside a function call or other - /// transformation). - /// - /// Examples: - /// - `Alias("item", Column("__unnest_placeholder(...)"))` → `Some((Scalar, true))` - /// - `Alias("t", Cast(Column("__unnest_placeholder(...)"), Int64))` → `Some((Scalar, false))` - /// - `Column("unrelated")` → `None` - fn find_unnest_placeholder(expr: &Expr) -> Option<(UnnestInputType, bool)> { - // Fast path: check if the expression IS the placeholder (peel aliases). - let mut inner = expr; - while let Expr::Alias(Alias { expr, .. }) = inner { - inner = expr.as_ref(); - } - if let Some(t) = Self::classify_placeholder_column(inner) { - return Some((t, true)); - } - // Slow path: walk the full expression tree. + /// Returns the [`UnnestInputType`] if any sub-expression is a column + /// whose name starts with `__unnest_placeholder`. The placeholder may + /// be at the top level (bare), inside a function call, or one of several + /// expressions — this function finds it regardless. + fn find_unnest_placeholder(expr: &Expr) -> Option { let mut result = None; let _ = expr.apply(|e| { if let Some(t) = Self::classify_placeholder_column(e) { @@ -1322,7 +1269,22 @@ impl Unparser<'_> { } Ok(TreeNodeRecursion::Continue) }); - result.map(|t| (t, false)) + result + } + + /// Returns true if `expr` is a placeholder column, optionally wrapped + /// in a single alias (the rewriter's internal `UNNEST(...)` name). + /// Does NOT match when a user alias wraps the internal alias + /// (e.g. `Alias("c1", Alias("UNNEST(...)", Column(placeholder)))`), + /// so the table-factor path correctly falls through to + /// `reconstruct_select_statement` which preserves user aliases. + fn is_bare_unnest_placeholder(expr: &Expr) -> bool { + // Peel at most one alias layer (the rewriter's internal name). + let inner = match expr { + Expr::Alias(Alias { expr, .. }) => expr.as_ref(), + other => other, + }; + Self::classify_placeholder_column(inner).is_some() } /// If `expr` is a `Column` whose name starts with `__unnest_placeholder`, @@ -1340,17 +1302,23 @@ impl Unparser<'_> { None } - /// Extract the outermost user-provided alias from an unnest expression. - /// Returns `None` if the outermost alias is DataFusion's internal display - /// name (e.g. `UNNEST(make_array(...))`), or if there is no alias at all. - fn extract_unnest_user_alias(expr: &Expr) -> Option { - if let Expr::Alias(Alias { name, .. }) = expr { - // Internal aliases start with "UNNEST(" — user aliases don't. - if !name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")) { - return Some(name.clone()); + /// Check whether an expression carries an internal `UNNEST(...)` display + /// name as its column name or outermost alias. After + /// [`unproject_unnest_expr_as_flatten_value`] rewrites the placeholder + /// column to `_unnest.VALUE`, the internal alias may still linger + /// (e.g. `Alias("UNNEST(make_array(...))", Column("_unnest.VALUE"))`). + /// Callers use this to replace the expression with a clean + /// `_unnest."VALUE"` select item. + fn has_internal_unnest_alias(expr: &Expr) -> bool { + match expr { + Expr::Column(col) => { + col.name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")) + } + Expr::Alias(Alias { name, .. }) => { + name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")) } + _ => false, } - None } fn try_unnest_to_table_factor_sql( diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index bff9696e64bee..fac1b2cb766f3 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3119,12 +3119,14 @@ fn snowflake_flatten_unnest_from_subselect() -> Result<(), DataFusionError> { } /// Dummy scalar UDF for testing — takes a string and returns List. +/// Simulates any UDF that extracts an array from a column (e.g. parsing +/// JSON, splitting a delimited string, etc.). #[derive(Debug, PartialEq, Eq, Hash)] -struct JsonGetArrayUdf { +struct ExtractArrayUdf { signature: Signature, } -impl JsonGetArrayUdf { +impl ExtractArrayUdf { fn new() -> Self { Self { signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable), @@ -3132,9 +3134,9 @@ impl JsonGetArrayUdf { } } -impl ScalarUDFImpl for JsonGetArrayUdf { +impl ScalarUDFImpl for ExtractArrayUdf { fn name(&self) -> &str { - "json_get_array" + "extract_array" } fn signature(&self) -> &Signature { &self.signature @@ -3152,10 +3154,10 @@ impl ScalarUDFImpl for JsonGetArrayUdf { #[test] fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { - // UNNEST on a UDF result: json_get_array(col) returns List, - // then UNNEST flattens it. This simulates a common Snowflake pattern - // where a UDF parses JSON into an array, then FLATTEN expands it. - let sql = "SELECT UNNEST(json_get_array(j1_string)) AS items FROM j1 LIMIT 5"; + // UNNEST on a UDF result: extract_array(col) returns List, + // then UNNEST flattens it. This exercises the path where the FLATTEN + // INPUT is a UDF call rather than a bare column reference. + let sql = "SELECT UNNEST(extract_array(j1_string)) AS items FROM j1 LIMIT 5"; let statement = Parser::new(&GenericDialect {}) .try_with_sql(sql)? @@ -3164,7 +3166,7 @@ fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { let state = MockSessionState::default() .with_aggregate_function(max_udaf()) .with_aggregate_function(min_udaf()) - .with_scalar_function(Arc::new(ScalarUDF::new_from_impl(JsonGetArrayUdf::new()))) + .with_scalar_function(Arc::new(ScalarUDF::new_from_impl(ExtractArrayUdf::new()))) .with_expr_planner(Arc::new(CoreFunctionPlanner::default())) .with_expr_planner(Arc::new(NestedFunctionPlanner)) .with_expr_planner(Arc::new(FieldAccessPlanner)); @@ -3180,7 +3182,7 @@ fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { let result = unparser.plan_to_sql(&plan)?; let actual = result.to_string(); - insta::assert_snapshot!(actual, @r#"SELECT "_unnest"."VALUE" AS "items" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => json_get_array("j1"."j1_string")) AS "_unnest" LIMIT 5"#); + insta::assert_snapshot!(actual, @r#"SELECT "_unnest"."VALUE" AS "items" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => extract_array("j1"."j1_string")) AS "_unnest" LIMIT 5"#); Ok(()) } From 8e83e17b198b73cc258240027713ba34f50ebf5b Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 17 Apr 2026 12:47:06 -0400 Subject: [PATCH 14/14] Generate unique LATERAL FLATTEN aliases per query MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the hardcoded FLATTEN_DEFAULT_ALIAS ("_unnest") with a per-SelectBuilder counter that generates unique aliases (_unnest_1, _unnest_2, …). This prevents alias collisions when multiple unnests appear in the same query. - Add flatten_alias_counter to SelectBuilder with next/current accessor methods, scoped to one SELECT so subqueries get independent counters - Remove FLATTEN_DEFAULT_ALIAS constant, the dead alias_name() method, and the default alias from FlattenRelationBuilder - All three FLATTEN code paths (placeholder projection, display-name projection, and Unnest handler) now coordinate through the SelectBuilder to ensure SELECT items and FROM clause use the same alias - Use internal_datafusion_err! macro for FLATTEN error handling - Migrate unnest tests from partial .contains() assertions to insta::assert_snapshot! for full SQL verification --- datafusion/sql/src/unparser/ast.rs | 39 ++-- datafusion/sql/src/unparser/plan.rs | 229 +++++++++++++--------- datafusion/sql/tests/cases/plan_to_sql.rs | 90 ++------- 3 files changed, 174 insertions(+), 184 deletions(-) diff --git a/datafusion/sql/src/unparser/ast.rs b/datafusion/sql/src/unparser/ast.rs index 9b3fd4151d946..7db2d96c41df5 100644 --- a/datafusion/sql/src/unparser/ast.rs +++ b/datafusion/sql/src/unparser/ast.rs @@ -162,9 +162,28 @@ pub struct SelectBuilder { qualify: Option, value_table_mode: Option, flavor: Option, + /// Counter for generating unique LATERAL FLATTEN aliases within this SELECT. + flatten_alias_counter: usize, } impl SelectBuilder { + /// Generate a unique alias for a LATERAL FLATTEN relation + /// (`_unnest_1`, `_unnest_2`, …). Each call returns a fresh name. + pub fn next_flatten_alias(&mut self) -> String { + self.flatten_alias_counter += 1; + format!("_unnest_{}", self.flatten_alias_counter) + } + + /// Returns the most recently generated flatten alias, or `None` if + /// `next_flatten_alias` has not been called yet. + pub fn current_flatten_alias(&self) -> Option { + if self.flatten_alias_counter > 0 { + Some(format!("_unnest_{}", self.flatten_alias_counter)) + } else { + None + } + } + pub fn distinct(&mut self, value: Option) -> &mut Self { self.distinct = value; self @@ -371,6 +390,7 @@ impl SelectBuilder { qualify: Default::default(), value_table_mode: Default::default(), flavor: Some(SelectFlavor::Standard), + flatten_alias_counter: 0, } } } @@ -697,10 +717,6 @@ impl Default for UnnestRelationBuilder { } } -/// Default table alias for FLATTEN table factors. -/// Snowflake requires an alias to reference output columns (e.g. `_unnest.VALUE`). -pub const FLATTEN_DEFAULT_ALIAS: &str = "_unnest"; - /// Builds a `LATERAL FLATTEN(INPUT => expr, OUTER => bool)` table factor /// for Snowflake-style unnesting. #[derive(Clone)] @@ -757,22 +773,9 @@ impl FlattenRelationBuilder { }) } - /// Returns the alias name for this FLATTEN relation. - /// Used to build qualified column references like `alias.VALUE`. - pub fn alias_name(&self) -> &str { - self.alias - .as_ref() - .map(|a| a.name.value.as_str()) - .unwrap_or(FLATTEN_DEFAULT_ALIAS) - } - fn create_empty() -> Self { Self { - alias: Some(ast::TableAlias { - name: ast::Ident::with_quote('"', FLATTEN_DEFAULT_ALIAS), - columns: vec![], - explicit: true, - }), + alias: None, input_expr: None, outer: false, } diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 29e8a4a627b49..16641510fb5ef 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -38,13 +38,12 @@ use crate::unparser::extension_unparser::{ }; use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs}; use crate::unparser::{ - ast::FLATTEN_DEFAULT_ALIAS, ast::FlattenRelationBuilder, ast::UnnestRelationBuilder, - rewrite::rewrite_qualify, + ast::FlattenRelationBuilder, ast::UnnestRelationBuilder, rewrite::rewrite_qualify, }; use crate::utils::UNNEST_PLACEHOLDER; use datafusion_common::{ Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err, - internal_err, not_impl_err, + internal_datafusion_err, internal_err, not_impl_err, tree_node::{TransformedResult, TreeNode, TreeNodeRecursion}, }; use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX}; @@ -239,17 +238,12 @@ impl Unparser<'_> { let mut exprs = p.expr.clone(); // If an Unnest node is found within the select, find and unproject the unnest column + let flatten_alias = select.current_flatten_alias(); if let Some(unnest) = find_unnest_node_within_select(plan) { - if self.dialect.unnest_as_lateral_flatten() { + if let Some(ref alias) = flatten_alias { exprs = exprs .into_iter() - .map(|e| { - unproject_unnest_expr_as_flatten_value( - e, - unnest, - FLATTEN_DEFAULT_ALIAS, - ) - }) + .map(|e| unproject_unnest_expr_as_flatten_value(e, unnest, alias)) .collect::>>()?; } else { exprs = exprs @@ -294,7 +288,6 @@ impl Unparser<'_> { select.projection(items); } _ => { - let use_flatten = self.dialect.unnest_as_lateral_flatten(); let items = exprs .iter() .map(|e| { @@ -302,11 +295,10 @@ impl Unparser<'_> { // internal UNNEST display-name alias may still wrap // the rewritten _unnest.VALUE column. Replace it // with the bare FLATTEN VALUE select item. - if use_flatten && Self::has_internal_unnest_alias(e) { - return Ok(self.build_flatten_value_select_item( - FLATTEN_DEFAULT_ALIAS, - None, - )); + if let Some(ref alias) = flatten_alias + && Self::has_internal_unnest_alias(e) + { + return Ok(self.build_flatten_value_select_item(alias, None)); } self.select_item_to_sql(e) }) @@ -360,6 +352,105 @@ impl Unparser<'_> { } } + /// Projection unparsing when [`super::dialect::Dialect::unnest_as_lateral_flatten`] is enabled: + /// Snowflake-style `LATERAL FLATTEN` for unnest (not other dialect spellings). + /// + /// [`Self::peel_to_unnest_with_modifiers`] walks through any intermediate + /// Limit/Sort nodes (the optimizer can insert these between the Projection + /// and the Unnest), applies their modifiers to the query, and returns the + /// Unnest plus the [`LogicalPlan`] ref to recurse into. This bypasses the + /// normal Limit/Sort handlers which would wrap the subtree in a derived + /// subquery. + /// + /// SELECT rendering is delegated to [`Self::reconstruct_select_statement`], + /// which rewrites placeholder columns to `alias."VALUE"` via + /// [`unproject_unnest_expr_as_flatten_value`]. + /// + /// Returns `Ok(true)` when this path fully handled the projection. + fn try_projection_unnest_as_lateral_flatten( + &self, + plan: &LogicalPlan, + p: &Projection, + query: &mut Option, + select: &mut SelectBuilder, + relation: &mut RelationBuilder, + unnest_input_type: Option<&UnnestInputType>, + ) -> Result { + // unnest_as_lateral_flatten: Snowflake LATERAL FLATTEN + if self.dialect.unnest_as_lateral_flatten() + && unnest_input_type.is_some() + && let Some((unnest, unnest_plan)) = + self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? + && let Some(mut flatten) = self.try_unnest_to_lateral_flatten_sql(unnest)? + { + let inner_projection = Self::peel_to_inner_projection(unnest.input.as_ref()) + .ok_or_else(|| { + internal_datafusion_err!( + "Unnest input is not a Projection: {:?}", + unnest.input + ) + })?; + + // Generate a unique alias for this FLATTEN so that + // multiple unnests in the same query don't collide. + // When the SELECT was already built by an outer Projection + // (already_projected), it already called + // next_flatten_alias(), so we reuse that alias. + if !select.already_projected() { + let flatten_alias_name = select.next_flatten_alias(); + flatten.alias(Some(ast::TableAlias { + name: Ident::with_quote('"', &flatten_alias_name), + columns: vec![], + explicit: true, + })); + self.reconstruct_select_statement(plan, p, select)?; + } else if let Some(alias) = select.current_flatten_alias() { + flatten.alias(Some(ast::TableAlias { + name: Ident::with_quote('"', &alias), + columns: vec![], + explicit: true, + })); + } + + if matches!( + inner_projection.input.as_ref(), + LogicalPlan::EmptyRelation(_) + ) { + // Inline array (e.g. UNNEST([1,2,3])): + // FLATTEN is the sole FROM source. + relation.flatten(flatten); + self.select_to_sql_recursively(unnest_plan, query, select, relation)?; + return Ok(true); + } + + // Non-empty source (table, subquery, etc.): + // recurse to set the primary FROM, then attach FLATTEN + // as a CROSS JOIN. + self.select_to_sql_recursively(unnest_plan, query, select, relation)?; + + let flatten_factor = flatten + .build() + .map_err(|e| internal_datafusion_err!("Failed to build FLATTEN: {e}"))?; + let cross_join = ast::Join { + relation: flatten_factor, + global: false, + join_operator: ast::JoinOperator::CrossJoin(ast::JoinConstraint::None), + }; + if let Some(mut from) = select.pop_from() { + from.push_join(cross_join); + select.push_from(from); + } else { + let mut twj = TableWithJoinsBuilder::default(); + twj.push_join(cross_join); + select.push_from(twj); + } + + return Ok(true); + } + + Ok(false) + } + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn select_to_sql_recursively( &self, @@ -435,80 +526,14 @@ impl Unparser<'_> { ); } - // --- Snowflake LATERAL FLATTEN path --- - // `peel_to_unnest_with_modifiers` walks through any - // intermediate Limit/Sort nodes (the optimizer can insert - // these between the Projection and the Unnest), applies - // their modifiers to the query, and returns the Unnest + - // the LogicalPlan ref to recurse into. This bypasses the - // normal Limit/Sort handlers which would wrap the subtree - // in a derived subquery. - // SELECT rendering is delegated to - // `reconstruct_select_statement`, which rewrites - // placeholder columns to `"_unnest"."VALUE"` via - // `unproject_unnest_expr_as_flatten_value` — this works - // for bare, wrapped, and multi-expression projections. - if self.dialect.unnest_as_lateral_flatten() - && unnest_input_type.is_some() - && let Some((unnest, unnest_plan)) = - self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? - && let Some(flatten) = - self.try_unnest_to_lateral_flatten_sql(unnest)? - { - let inner_projection = - Self::peel_to_inner_projection(unnest.input.as_ref()) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Unnest input is not a Projection: {:?}", - unnest.input - )) - })?; - - // An outer plan (e.g. a wrapping Projection) may have - // already set SELECT columns; only set them once. - if !select.already_projected() { - self.reconstruct_select_statement(plan, p, select)?; - } - - if matches!( - inner_projection.input.as_ref(), - LogicalPlan::EmptyRelation(_) - ) { - // Inline array (e.g. UNNEST([1,2,3])): - // FLATTEN is the sole FROM source. - relation.flatten(flatten); - return self.select_to_sql_recursively( - unnest_plan, - query, - select, - relation, - ); - } - - // Non-empty source (table, subquery, etc.): - // recurse to set the primary FROM, then attach FLATTEN - // as a CROSS JOIN. - self.select_to_sql_recursively(unnest_plan, query, select, relation)?; - - let flatten_factor = flatten.build().map_err(|e| { - DataFusionError::Internal(format!("Failed to build FLATTEN: {e}")) - })?; - let cross_join = ast::Join { - relation: flatten_factor, - global: false, - join_operator: ast::JoinOperator::CrossJoin( - ast::JoinConstraint::None, - ), - }; - if let Some(mut from) = select.pop_from() { - from.push_join(cross_join); - select.push_from(from); - } else { - let mut twj = TableWithJoinsBuilder::default(); - twj.push_join(cross_join); - select.push_from(twj); - } - + if self.try_projection_unnest_as_lateral_flatten( + plan, + p, + query, + select, + relation, + unnest_input_type.as_ref(), + )? { return Ok(()); } @@ -536,6 +561,16 @@ impl Unparser<'_> { columns, ); } + // For Snowflake FLATTEN: when the outer Projection has + // UNNEST(...) display-name columns (from SELECT * / SELECT + // UNNEST(...)), generate a flatten alias now so that + // reconstruct_select_statement and the downstream Unnest + // handler both use the same alias. + if self.dialect.unnest_as_lateral_flatten() + && p.expr.iter().any(Self::has_internal_unnest_alias) + { + select.next_flatten_alias(); + } self.reconstruct_select_statement(plan, p, select)?; self.select_to_sql_recursively(p.input.as_ref(), query, select, relation) } @@ -1135,9 +1170,19 @@ impl Unparser<'_> { // relation here so the FROM clause is emitted. if self.dialect.unnest_as_lateral_flatten() && !relation.has_relation() - && let Some(flatten_relation) = + && let Some(mut flatten_relation) = self.try_unnest_to_lateral_flatten_sql(unnest)? { + // Use the alias already generated by the Projection + // handler so SELECT items and the FLATTEN relation + // reference the same name. + if let Some(alias) = select.current_flatten_alias() { + flatten_relation.alias(Some(ast::TableAlias { + name: Ident::with_quote('"', &alias), + columns: vec![], + explicit: true, + })); + } relation.flatten(flatten_relation); } diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index fac1b2cb766f3..db1924f9e11c7 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -3004,7 +3004,7 @@ fn snowflake_unnest_to_lateral_flatten_simple() -> Result<(), DataFusionError> { sql: "SELECT * FROM UNNEST([1,2,3])", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "_unnest"."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest""#, + expected: @r#"SELECT "_unnest_1"."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1""#, ); Ok(()) } @@ -3016,7 +3016,7 @@ fn snowflake_unnest_to_lateral_flatten_with_cross_join() -> Result<(), DataFusio sql: "SELECT * FROM UNNEST([1,2,3]), j1", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "_unnest"."VALUE", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest" CROSS JOIN "j1""#, + expected: @r#"SELECT "_unnest_1"."VALUE", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1" CROSS JOIN "j1""#, ); Ok(()) } @@ -3048,7 +3048,7 @@ fn snowflake_flatten_implicit_from() -> Result<(), DataFusionError> { sql: "SELECT UNNEST([1,2,3])", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "_unnest"."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest""#, + expected: @r#"SELECT "_unnest_1"."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1""#, ); Ok(()) } @@ -3061,7 +3061,7 @@ fn snowflake_flatten_string_array() -> Result<(), DataFusionError> { sql: "SELECT * FROM UNNEST(['a','b','c'])", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "_unnest"."VALUE" FROM LATERAL FLATTEN(INPUT => ['a', 'b', 'c']) AS "_unnest""#, + expected: @r#"SELECT "_unnest_1"."VALUE" FROM LATERAL FLATTEN(INPUT => ['a', 'b', 'c']) AS "_unnest_1""#, ); Ok(()) } @@ -3073,7 +3073,7 @@ fn snowflake_flatten_select_unnest_with_alias() -> Result<(), DataFusionError> { sql: "SELECT UNNEST([1,2,3]) as c1", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest""#, + expected: @r#"SELECT "_unnest_1"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1""#, ); Ok(()) } @@ -3085,7 +3085,7 @@ fn snowflake_flatten_select_unnest_plus_literal() -> Result<(), DataFusionError> sql: "SELECT UNNEST([1,2,3]), 1", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "_unnest"."VALUE", "Int64(1)" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest""#, + expected: @r#"SELECT "_unnest_1"."VALUE", "Int64(1)" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1""#, ); Ok(()) } @@ -3113,7 +3113,7 @@ fn snowflake_flatten_unnest_from_subselect() -> Result<(), DataFusionError> { sql: "SELECT UNNEST(array_col) FROM (SELECT array_col FROM unnest_table WHERE array_col IS NOT NULL LIMIT 3)", parser_dialect: GenericDialect {}, unparser_dialect: snowflake, - expected: @r#"SELECT "_unnest"."VALUE" FROM (SELECT "unnest_table"."array_col" FROM "unnest_table" WHERE "unnest_table"."array_col" IS NOT NULL LIMIT 3) CROSS JOIN LATERAL FLATTEN(INPUT => "unnest_table"."array_col") AS "_unnest""#, + expected: @r#"SELECT "_unnest_1"."VALUE" FROM (SELECT "unnest_table"."array_col" FROM "unnest_table" WHERE "unnest_table"."array_col" IS NOT NULL LIMIT 3) CROSS JOIN LATERAL FLATTEN(INPUT => "unnest_table"."array_col") AS "_unnest_1""#, ); Ok(()) } @@ -3182,7 +3182,7 @@ fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { let result = unparser.plan_to_sql(&plan)?; let actual = result.to_string(); - insta::assert_snapshot!(actual, @r#"SELECT "_unnest"."VALUE" AS "items" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => extract_array("j1"."j1_string")) AS "_unnest" LIMIT 5"#); + insta::assert_snapshot!(actual, @r#"SELECT "_unnest_1"."VALUE" AS "items" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => extract_array("j1"."j1_string")) AS "_unnest_1" LIMIT 5"#); Ok(()) } @@ -3212,7 +3212,7 @@ fn snowflake_flatten_limit_between_projection_and_unnest() -> Result<(), DataFus let actual = result.to_string(); // Must contain LATERAL FLATTEN — the Limit must not prevent FLATTEN detection - insta::assert_snapshot!(actual, @r#"SELECT "_unnest"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest" LIMIT 5"#); + insta::assert_snapshot!(actual, @r#"SELECT "_unnest_1"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" LIMIT 5"#); Ok(()) } @@ -3239,14 +3239,7 @@ fn snowflake_flatten_sort_between_projection_and_unnest() -> Result<(), DataFusi let actual = result.to_string(); // Must contain LATERAL FLATTEN — the Sort must not prevent FLATTEN detection - assert!( - actual.contains("LATERAL FLATTEN"), - "Expected LATERAL FLATTEN in SQL, got: {actual}" - ); - assert!( - actual.contains("ORDER BY"), - "Expected ORDER BY in SQL, got: {actual}" - ); + insta::assert_snapshot!(actual, @r#"SELECT "_unnest_1"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" ORDER BY "__unnest_placeholder(items)" ASC NULLS FIRST"#); Ok(()) } @@ -3274,14 +3267,7 @@ fn snowflake_flatten_limit_between_projection_and_unnest_with_subquery_alias() let result = unparser.plan_to_sql(&plan)?; let actual = result.to_string(); - assert!( - actual.contains("LATERAL FLATTEN"), - "Expected LATERAL FLATTEN in SQL, got: {actual}" - ); - assert!( - actual.contains("LIMIT 10"), - "Expected LIMIT 10 in SQL, got: {actual}" - ); + insta::assert_snapshot!(actual, @r#"SELECT "_unnest_1"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" LIMIT 10"#); Ok(()) } @@ -3312,14 +3298,7 @@ fn snowflake_flatten_composed_expression_wrapping_unnest() -> Result<(), DataFus let actual = result.to_string(); // Must contain LATERAL FLATTEN despite the placeholder being inside CAST - assert!( - actual.contains("LATERAL FLATTEN"), - "Expected LATERAL FLATTEN in SQL, got: {actual}" - ); - assert!( - actual.contains("CAST"), - "Expected CAST in SQL, got: {actual}" - ); + insta::assert_snapshot!(actual, @r#"SELECT CAST("_unnest_1"."VALUE" AS BIGINT) AS "item_id" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1""#); Ok(()) } @@ -3347,18 +3326,7 @@ fn snowflake_flatten_composed_expression_with_limit() -> Result<(), DataFusionEr let result = unparser.plan_to_sql(&plan)?; let actual = result.to_string(); - assert!( - actual.contains("LATERAL FLATTEN"), - "Expected LATERAL FLATTEN in SQL, got: {actual}" - ); - assert!( - actual.contains("CAST"), - "Expected CAST in SQL, got: {actual}" - ); - assert!( - actual.contains("LIMIT 5"), - "Expected LIMIT 5 in SQL, got: {actual}" - ); + insta::assert_snapshot!(actual, @r#"SELECT CAST("_unnest_1"."VALUE" AS BIGINT) AS "item_id" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" LIMIT 5"#); Ok(()) } @@ -3388,23 +3356,7 @@ fn snowflake_flatten_multi_expression_projection() -> Result<(), DataFusionError let result = unparser.plan_to_sql(&plan)?; let actual = result.to_string(); - assert!( - actual.contains("LATERAL FLATTEN"), - "Expected LATERAL FLATTEN in SQL, got: {actual}" - ); - // Both expressions should be present - assert!( - actual.contains("CAST"), - "Expected CAST in SQL, got: {actual}" - ); - assert!( - actual.contains(r#"AS "a""#), - "Expected alias 'a' in SQL, got: {actual}" - ); - assert!( - actual.contains(r#"AS "b""#), - "Expected alias 'b' in SQL, got: {actual}" - ); + insta::assert_snapshot!(actual, @r#"SELECT CAST("_unnest_1"."VALUE" AS BIGINT) AS "a", CAST("_unnest_1"."VALUE" AS VARCHAR) AS "b" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1""#); Ok(()) } @@ -3432,14 +3384,7 @@ fn snowflake_flatten_multi_expression_with_limit() -> Result<(), DataFusionError let result = unparser.plan_to_sql(&plan)?; let actual = result.to_string(); - assert!( - actual.contains("LATERAL FLATTEN"), - "Expected LATERAL FLATTEN in SQL, got: {actual}" - ); - assert!( - actual.contains("LIMIT 10"), - "Expected LIMIT 10 in SQL, got: {actual}" - ); + insta::assert_snapshot!(actual, @r#"SELECT CAST("_unnest_1"."VALUE" AS BIGINT) AS "a", CAST("_unnest_1"."VALUE" AS VARCHAR) AS "b" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" LIMIT 10"#); Ok(()) } @@ -3469,9 +3414,6 @@ fn snowflake_unnest_through_subquery_alias() -> Result<(), DataFusionError> { let sql_str = result.to_string(); // Should contain LATERAL FLATTEN, not error - assert!( - sql_str.contains("LATERAL FLATTEN"), - "Expected LATERAL FLATTEN in SQL, got: {sql_str}" - ); + insta::assert_snapshot!(sql_str, @r#"SELECT "_unnest_1"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1""#); Ok(()) }