diff --git a/datafusion/sql/src/unparser/ast.rs b/datafusion/sql/src/unparser/ast.rs index 8446a44b07e35..7db2d96c41df5 100644 --- a/datafusion/sql/src/unparser/ast.rs +++ b/datafusion/sql/src/unparser/ast.rs @@ -162,9 +162,28 @@ pub struct SelectBuilder { qualify: Option, value_table_mode: Option, flavor: Option, + /// Counter for generating unique LATERAL FLATTEN aliases within this SELECT. + flatten_alias_counter: usize, } impl SelectBuilder { + /// Generate a unique alias for a LATERAL FLATTEN relation + /// (`_unnest_1`, `_unnest_2`, …). Each call returns a fresh name. + pub fn next_flatten_alias(&mut self) -> String { + self.flatten_alias_counter += 1; + format!("_unnest_{}", self.flatten_alias_counter) + } + + /// Returns the most recently generated flatten alias, or `None` if + /// `next_flatten_alias` has not been called yet. + pub fn current_flatten_alias(&self) -> Option { + if self.flatten_alias_counter > 0 { + Some(format!("_unnest_{}", self.flatten_alias_counter)) + } else { + None + } + } + pub fn distinct(&mut self, value: Option) -> &mut Self { self.distinct = value; self @@ -371,6 +390,7 @@ impl SelectBuilder { qualify: Default::default(), value_table_mode: Default::default(), flavor: Some(SelectFlavor::Standard), + flatten_alias_counter: 0, } } } @@ -432,11 +452,11 @@ pub struct RelationBuilder { } #[derive(Clone)] -#[expect(clippy::large_enum_variant)] enum TableFactorBuilder { Table(TableRelationBuilder), Derived(DerivedRelationBuilder), Unnest(UnnestRelationBuilder), + Flatten(FlattenRelationBuilder), Empty, } @@ -458,6 +478,11 @@ impl RelationBuilder { self } + pub fn flatten(&mut self, value: FlattenRelationBuilder) -> &mut Self { + self.relation = Some(TableFactorBuilder::Flatten(value)); + self + } + pub fn empty(&mut self) -> &mut Self { self.relation = Some(TableFactorBuilder::Empty); self @@ -474,6 +499,9 @@ impl RelationBuilder { Some(TableFactorBuilder::Unnest(ref mut rel_builder)) => { rel_builder.alias = value; } + Some(TableFactorBuilder::Flatten(ref mut rel_builder)) => { + rel_builder.alias = value; + } Some(TableFactorBuilder::Empty) => (), None => (), } @@ -484,6 +512,7 @@ impl RelationBuilder { Some(TableFactorBuilder::Table(ref value)) => Some(value.build()?), Some(TableFactorBuilder::Derived(ref value)) => Some(value.build()?), Some(TableFactorBuilder::Unnest(ref value)) => Some(value.build()?), + Some(TableFactorBuilder::Flatten(ref value)) => Some(value.build()?), Some(TableFactorBuilder::Empty) => None, None => return Err(Into::into(UninitializedFieldError::from("relation"))), }) @@ -688,6 +717,77 @@ impl Default for UnnestRelationBuilder { } } +/// Builds a `LATERAL FLATTEN(INPUT => expr, OUTER => bool)` table factor +/// for Snowflake-style unnesting. +#[derive(Clone)] +pub struct FlattenRelationBuilder { + pub alias: Option, + /// The input expression to flatten (e.g. a column reference). + pub input_expr: Option, + /// Whether to preserve rows for NULL/empty inputs (Snowflake `OUTER` param). + pub outer: bool, +} + +impl FlattenRelationBuilder { + pub fn alias(&mut self, value: Option) -> &mut Self { + self.alias = value; + self + } + + pub fn input_expr(&mut self, value: ast::Expr) -> &mut Self { + self.input_expr = Some(value); + self + } + + pub fn outer(&mut self, value: bool) -> &mut Self { + self.outer = value; + self + } + + pub fn build(&self) -> Result { + let input = self.input_expr.clone().ok_or_else(|| { + BuilderError::from(UninitializedFieldError::from("input_expr")) + })?; + + let mut args = vec![ast::FunctionArg::Named { + name: ast::Ident::new("INPUT"), + arg: ast::FunctionArgExpr::Expr(input), + operator: ast::FunctionArgOperator::RightArrow, + }]; + + if self.outer { + args.push(ast::FunctionArg::Named { + name: ast::Ident::new("OUTER"), + arg: ast::FunctionArgExpr::Expr(ast::Expr::Value( + ast::Value::Boolean(true).into(), + )), + operator: ast::FunctionArgOperator::RightArrow, + }); + } + + Ok(ast::TableFactor::Function { + lateral: true, + name: ast::ObjectName::from(vec![ast::Ident::new("FLATTEN")]), + args, + alias: self.alias.clone(), + }) + } + + fn create_empty() -> Self { + Self { + alias: None, + input_expr: None, + outer: false, + } + } +} + +impl Default for FlattenRelationBuilder { + fn default() -> Self { + Self::create_empty() + } +} + /// Runtime error when a `build()` method is called and one or more required fields /// do not have a value. #[derive(Debug, Clone)] diff --git a/datafusion/sql/src/unparser/dialect.rs b/datafusion/sql/src/unparser/dialect.rs index ee31190b68b98..d6287a2084fac 100644 --- a/datafusion/sql/src/unparser/dialect.rs +++ b/datafusion/sql/src/unparser/dialect.rs @@ -206,6 +206,15 @@ pub trait Dialect: Send + Sync { false } + /// Unparse the unnest plan as `LATERAL FLATTEN(INPUT => expr, ...)`. + /// + /// Snowflake uses FLATTEN as a table function instead of the SQL-standard UNNEST. + /// When this returns `true`, the unparser emits + /// `LATERAL FLATTEN(INPUT => , OUTER => )` in the FROM clause. + fn unnest_as_lateral_flatten(&self) -> bool { + false + } + /// Allows the dialect to override column alias unparsing if the dialect has specific rules. /// Returns None if the default unparsing should be used, or Some(String) if there is /// a custom implementation for the alias. @@ -664,6 +673,59 @@ impl BigQueryDialect { } } +/// Dialect for Snowflake SQL. +/// +/// Key differences from the default dialect: +/// - Uses double-quote identifier quoting +/// - Supports `NULLS FIRST`/`NULLS LAST` in `ORDER BY` +/// - Does not support empty select lists (`SELECT FROM t`) +/// - Does not support column aliases in table alias definitions +/// (Snowflake accepts the syntax but silently ignores the renames in join contexts) +/// - Unparses `UNNEST` plans as `LATERAL FLATTEN(INPUT => expr, ...)` +pub struct SnowflakeDialect {} + +#[expect(clippy::new_without_default)] +impl SnowflakeDialect { + #[must_use] + pub fn new() -> Self { + Self {} + } +} + +impl Dialect for SnowflakeDialect { + fn identifier_quote_style(&self, _: &str) -> Option { + Some('"') + } + + fn supports_nulls_first_in_sort(&self) -> bool { + true + } + + fn supports_empty_select_list(&self) -> bool { + false + } + + fn supports_column_alias_in_table_alias(&self) -> bool { + false + } + + fn timestamp_cast_dtype( + &self, + _time_unit: &TimeUnit, + tz: &Option>, + ) -> ast::DataType { + if tz.is_some() { + ast::DataType::Timestamp(None, TimezoneInfo::WithTimeZone) + } else { + ast::DataType::Timestamp(None, TimezoneInfo::None) + } + } + + fn unnest_as_lateral_flatten(&self) -> bool { + true + } +} + pub struct CustomDialect { identifier_quote_style: Option, supports_nulls_first_in_sort: bool, @@ -686,6 +748,7 @@ pub struct CustomDialect { window_func_support_window_frame: bool, full_qualified_col: bool, unnest_as_table_factor: bool, + unnest_as_lateral_flatten: bool, } impl Default for CustomDialect { @@ -715,6 +778,7 @@ impl Default for CustomDialect { window_func_support_window_frame: true, full_qualified_col: false, unnest_as_table_factor: false, + unnest_as_lateral_flatten: false, } } } @@ -829,6 +893,10 @@ impl Dialect for CustomDialect { fn unnest_as_table_factor(&self) -> bool { self.unnest_as_table_factor } + + fn unnest_as_lateral_flatten(&self) -> bool { + self.unnest_as_lateral_flatten + } } /// `CustomDialectBuilder` to build `CustomDialect` using builder pattern @@ -867,6 +935,7 @@ pub struct CustomDialectBuilder { window_func_support_window_frame: bool, full_qualified_col: bool, unnest_as_table_factor: bool, + unnest_as_lateral_flatten: bool, } impl Default for CustomDialectBuilder { @@ -902,6 +971,7 @@ impl CustomDialectBuilder { window_func_support_window_frame: true, full_qualified_col: false, unnest_as_table_factor: false, + unnest_as_lateral_flatten: false, } } @@ -929,6 +999,7 @@ impl CustomDialectBuilder { window_func_support_window_frame: self.window_func_support_window_frame, full_qualified_col: self.full_qualified_col, unnest_as_table_factor: self.unnest_as_table_factor, + unnest_as_lateral_flatten: self.unnest_as_lateral_flatten, } } @@ -1075,4 +1146,12 @@ impl CustomDialectBuilder { self.unnest_as_table_factor = unnest_as_table_factor; self } + + pub fn with_unnest_as_lateral_flatten( + mut self, + unnest_as_lateral_flatten: bool, + ) -> Self { + self.unnest_as_lateral_flatten = unnest_as_lateral_flatten; + self + } } diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index c0b77b9dba88a..16641510fb5ef 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -29,21 +29,24 @@ use super::{ utils::{ find_agg_node_within_select, find_unnest_node_within_select, find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters, - unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs, + unproject_sort_expr, unproject_unnest_expr, + unproject_unnest_expr_as_flatten_value, unproject_window_exprs, }, }; use crate::unparser::extension_unparser::{ UnparseToStatementResult, UnparseWithinStatementResult, }; use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs}; -use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify}; +use crate::unparser::{ + ast::FlattenRelationBuilder, ast::UnnestRelationBuilder, rewrite::rewrite_qualify, +}; use crate::utils::UNNEST_PLACEHOLDER; use datafusion_common::{ Column, DataFusionError, Result, ScalarValue, TableReference, assert_or_internal_err, - internal_err, not_impl_err, - tree_node::{TransformedResult, TreeNode}, + internal_datafusion_err, internal_err, not_impl_err, + tree_node::{TransformedResult, TreeNode, TreeNodeRecursion}, }; -use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX; +use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX}; use datafusion_expr::{ BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest, @@ -235,11 +238,19 @@ impl Unparser<'_> { let mut exprs = p.expr.clone(); // If an Unnest node is found within the select, find and unproject the unnest column + let flatten_alias = select.current_flatten_alias(); if let Some(unnest) = find_unnest_node_within_select(plan) { - exprs = exprs - .into_iter() - .map(|e| unproject_unnest_expr(e, unnest)) - .collect::>>()?; + if let Some(ref alias) = flatten_alias { + exprs = exprs + .into_iter() + .map(|e| unproject_unnest_expr_as_flatten_value(e, unnest, alias)) + .collect::>>()?; + } else { + exprs = exprs + .into_iter() + .map(|e| unproject_unnest_expr(e, unnest)) + .collect::>>()?; + } }; match ( @@ -279,7 +290,18 @@ impl Unparser<'_> { _ => { let items = exprs .iter() - .map(|e| self.select_item_to_sql(e)) + .map(|e| { + // After unproject_unnest_expr_as_flatten_value, an + // internal UNNEST display-name alias may still wrap + // the rewritten _unnest.VALUE column. Replace it + // with the bare FLATTEN VALUE select item. + if let Some(ref alias) = flatten_alias + && Self::has_internal_unnest_alias(e) + { + return Ok(self.build_flatten_value_select_item(alias, None)); + } + self.select_item_to_sql(e) + }) .collect::>>()?; select.projection(items); } @@ -330,6 +352,105 @@ impl Unparser<'_> { } } + /// Projection unparsing when [`super::dialect::Dialect::unnest_as_lateral_flatten`] is enabled: + /// Snowflake-style `LATERAL FLATTEN` for unnest (not other dialect spellings). + /// + /// [`Self::peel_to_unnest_with_modifiers`] walks through any intermediate + /// Limit/Sort nodes (the optimizer can insert these between the Projection + /// and the Unnest), applies their modifiers to the query, and returns the + /// Unnest plus the [`LogicalPlan`] ref to recurse into. This bypasses the + /// normal Limit/Sort handlers which would wrap the subtree in a derived + /// subquery. + /// + /// SELECT rendering is delegated to [`Self::reconstruct_select_statement`], + /// which rewrites placeholder columns to `alias."VALUE"` via + /// [`unproject_unnest_expr_as_flatten_value`]. + /// + /// Returns `Ok(true)` when this path fully handled the projection. + fn try_projection_unnest_as_lateral_flatten( + &self, + plan: &LogicalPlan, + p: &Projection, + query: &mut Option, + select: &mut SelectBuilder, + relation: &mut RelationBuilder, + unnest_input_type: Option<&UnnestInputType>, + ) -> Result { + // unnest_as_lateral_flatten: Snowflake LATERAL FLATTEN + if self.dialect.unnest_as_lateral_flatten() + && unnest_input_type.is_some() + && let Some((unnest, unnest_plan)) = + self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? + && let Some(mut flatten) = self.try_unnest_to_lateral_flatten_sql(unnest)? + { + let inner_projection = Self::peel_to_inner_projection(unnest.input.as_ref()) + .ok_or_else(|| { + internal_datafusion_err!( + "Unnest input is not a Projection: {:?}", + unnest.input + ) + })?; + + // Generate a unique alias for this FLATTEN so that + // multiple unnests in the same query don't collide. + // When the SELECT was already built by an outer Projection + // (already_projected), it already called + // next_flatten_alias(), so we reuse that alias. + if !select.already_projected() { + let flatten_alias_name = select.next_flatten_alias(); + flatten.alias(Some(ast::TableAlias { + name: Ident::with_quote('"', &flatten_alias_name), + columns: vec![], + explicit: true, + })); + self.reconstruct_select_statement(plan, p, select)?; + } else if let Some(alias) = select.current_flatten_alias() { + flatten.alias(Some(ast::TableAlias { + name: Ident::with_quote('"', &alias), + columns: vec![], + explicit: true, + })); + } + + if matches!( + inner_projection.input.as_ref(), + LogicalPlan::EmptyRelation(_) + ) { + // Inline array (e.g. UNNEST([1,2,3])): + // FLATTEN is the sole FROM source. + relation.flatten(flatten); + self.select_to_sql_recursively(unnest_plan, query, select, relation)?; + return Ok(true); + } + + // Non-empty source (table, subquery, etc.): + // recurse to set the primary FROM, then attach FLATTEN + // as a CROSS JOIN. + self.select_to_sql_recursively(unnest_plan, query, select, relation)?; + + let flatten_factor = flatten + .build() + .map_err(|e| internal_datafusion_err!("Failed to build FLATTEN: {e}"))?; + let cross_join = ast::Join { + relation: flatten_factor, + global: false, + join_operator: ast::JoinOperator::CrossJoin(ast::JoinConstraint::None), + }; + if let Some(mut from) = select.pop_from() { + from.push_join(cross_join); + select.push_from(from); + } else { + let mut twj = TableWithJoinsBuilder::default(); + twj.push_join(cross_join); + select.push_from(twj); + } + + return Ok(true); + } + + Ok(false) + } + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn select_to_sql_recursively( &self, @@ -376,29 +497,46 @@ impl Unparser<'_> { .select_to_sql_recursively(&new_plan, query, select, relation); } - // Projection can be top-level plan for unnest relation - // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have - // only one expression, which is the placeholder column generated by the rewriter. - let unnest_input_type = if p.expr.len() == 1 { - Self::check_unnest_placeholder_with_outer_ref(&p.expr[0]) - } else { - None - }; + // Projection can be top-level plan for unnest relation. + // The projection generated by the `RecursiveUnnestRewriter` + // will have at least one expression referencing an unnest + // placeholder column. + let unnest_input_type: Option = + p.expr.iter().find_map(Self::find_unnest_placeholder); + + // --- UNNEST table factor path (BigQuery, etc.) --- + // Only fires for a single bare-placeholder projection. + // Uses peel_to_unnest_with_modifiers (rather than matching + // p.input directly) to handle Limit/Sort between Projection + // and Unnest. if self.dialect.unnest_as_table_factor() - && unnest_input_type.is_some() - && let LogicalPlan::Unnest(unnest) = &p.input.as_ref() + && p.expr.len() == 1 + && Self::is_bare_unnest_placeholder(&p.expr[0]) + && let Some((unnest, unnest_plan)) = + self.peel_to_unnest_with_modifiers(p.input.as_ref(), query)? && let Some(unnest_relation) = self.try_unnest_to_table_factor_sql(unnest)? { relation.unnest(unnest_relation); return self.select_to_sql_recursively( - p.input.as_ref(), + unnest_plan, query, select, relation, ); } + if self.try_projection_unnest_as_lateral_flatten( + plan, + p, + query, + select, + relation, + unnest_input_type.as_ref(), + )? { + return Ok(()); + } + // If it's a unnest projection, we should provide the table column alias // to provide a column name for the unnest relation. let columns = if unnest_input_type.is_some() { @@ -423,6 +561,16 @@ impl Unparser<'_> { columns, ); } + // For Snowflake FLATTEN: when the outer Projection has + // UNNEST(...) display-name columns (from SELECT * / SELECT + // UNNEST(...)), generate a flatten alias now so that + // reconstruct_select_statement and the downstream Unnest + // handler both use the same alias. + if self.dialect.unnest_as_lateral_flatten() + && p.expr.iter().any(Self::has_internal_unnest_alias) + { + select.next_flatten_alias(); + } self.reconstruct_select_statement(plan, p, select)?; self.select_to_sql_recursively(p.input.as_ref(), query, select, relation) } @@ -1001,11 +1149,42 @@ impl Unparser<'_> { } } LogicalPlan::Unnest(unnest) => { - assert_or_internal_err!( - unnest.struct_type_columns.is_empty(), - "Struct type columns are not currently supported in UNNEST: {:?}", - unnest.struct_type_columns - ); + if !unnest.struct_type_columns.is_empty() { + if self.dialect.unnest_as_lateral_flatten() { + return not_impl_err!( + "Snowflake FLATTEN cannot unparse struct unnest: \ + DataFusion expands struct fields into columns (horizontal), \ + but Snowflake FLATTEN expands them into rows (vertical). \ + Columns: {:?}", + unnest.struct_type_columns + ); + } + return internal_err!( + "Struct type columns are not currently supported in UNNEST: {:?}", + unnest.struct_type_columns + ); + } + + // For Snowflake FLATTEN: if the relation hasn't been set yet + // (UNNEST was in SELECT clause, not FROM clause), set the FLATTEN + // relation here so the FROM clause is emitted. + if self.dialect.unnest_as_lateral_flatten() + && !relation.has_relation() + && let Some(mut flatten_relation) = + self.try_unnest_to_lateral_flatten_sql(unnest)? + { + // Use the alias already generated by the Projection + // handler so SELECT items and the FLATTEN relation + // reference the same name. + if let Some(alias) = select.current_flatten_alias() { + flatten_relation.alias(Some(ast::TableAlias { + name: Ident::with_quote('"', &alias), + columns: vec![], + explicit: true, + })); + } + relation.flatten(flatten_relation); + } // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip. // Otherwise, there will be a duplicate SELECT clause. @@ -1014,8 +1193,9 @@ impl Unparser<'_> { // | Projection: table.col1, table.col2 AS UNNEST(table.col2) // | Filter: table.col3 = Int64(3) // | TableScan: table projection=None - if let LogicalPlan::Projection(p) = unnest.input.as_ref() { - // continue with projection input + if let Some(p) = Self::peel_to_inner_projection(unnest.input.as_ref()) { + // Skip the inner Projection (synthetic rewriter node) + // and continue with its input. self.select_to_sql_recursively(&p.input, query, select, relation) } else { internal_err!("Unnest input is not a Projection: {unnest:?}") @@ -1025,7 +1205,9 @@ impl Unparser<'_> { if find_unnest_node_until_relation(subquery.subquery.as_ref()) .is_some() => { - if self.dialect.unnest_as_table_factor() { + if self.dialect.unnest_as_table_factor() + || self.dialect.unnest_as_lateral_flatten() + { self.select_to_sql_recursively( subquery.subquery.as_ref(), query, @@ -1048,18 +1230,113 @@ impl Unparser<'_> { } } - /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`. + /// Walk through transparent nodes (SubqueryAlias) to find the inner + /// Projection that feeds an Unnest node. /// - /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`, - /// it means it is a scalar column, return [UnnestInputType::Scalar]. - /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`, - /// it means it is an outer reference column, return [UnnestInputType::OuterReference]. - /// - If the column is not a placeholder column, return [None]. + /// The inner Projection is created atomically by the + /// `RecursiveUnnestRewriter` and contains the array expression that the + /// Unnest operates on. A `SubqueryAlias` (e.g. from a virtual/passthrough + /// table) may wrap the Projection. + fn peel_to_inner_projection(plan: &LogicalPlan) -> Option<&Projection> { + match plan { + LogicalPlan::Projection(p) => Some(p), + LogicalPlan::SubqueryAlias(alias) => { + Self::peel_to_inner_projection(alias.input.as_ref()) + } + _ => None, + } + } + + /// Walk through transparent nodes (Limit, Sort) between the outer + /// Projection and the Unnest, applying their SQL modifiers (LIMIT, + /// OFFSET, ORDER BY) to the query builder. Returns the `Unnest` node + /// and a reference to the enclosing `LogicalPlan` for recursion, or + /// `Ok(None)` if no Unnest is found. + /// + /// By processing Limit/Sort inline and then recursing into the Unnest + /// plan directly, we bypass the normal Limit/Sort handlers which would + /// create unwanted derived subqueries (since `already_projected` is + /// set at the point this is called). + fn peel_to_unnest_with_modifiers<'a>( + &self, + plan: &'a LogicalPlan, + query: &mut Option, + ) -> Result> { + match plan { + LogicalPlan::Unnest(unnest) => Ok(Some((unnest, plan))), + LogicalPlan::Limit(limit) => { + if let Some(fetch) = &limit.fetch + && let Some(q) = query.as_mut() + { + q.limit(Some(self.expr_to_sql(fetch)?)); + } + if let Some(skip) = &limit.skip + && let Some(q) = query.as_mut() + { + q.offset(Some(ast::Offset { + rows: ast::OffsetRows::None, + value: self.expr_to_sql(skip)?, + })); + } + self.peel_to_unnest_with_modifiers(limit.input.as_ref(), query) + } + LogicalPlan::Sort(sort) => { + let Some(query_ref) = query.as_mut() else { + return internal_err!( + "Sort between Projection and Unnest requires a statement context." + ); + }; + if let Some(fetch) = sort.fetch { + query_ref.limit(Some(ast::Expr::value(ast::Value::Number( + fetch.to_string(), + false, + )))); + } + query_ref.order_by(self.sorts_to_sql(&sort.expr)?); + self.peel_to_unnest_with_modifiers(sort.input.as_ref(), query) + } + _ => Ok(None), + } + } + + /// Search an expression tree for an unnest placeholder column reference. /// - /// `outer_ref` is the display result of [Expr::OuterReferenceColumn] - fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option { - if let Expr::Alias(Alias { expr, .. }) = expr - && let Expr::Column(Column { name, .. }) = expr.as_ref() + /// Returns the [`UnnestInputType`] if any sub-expression is a column + /// whose name starts with `__unnest_placeholder`. The placeholder may + /// be at the top level (bare), inside a function call, or one of several + /// expressions — this function finds it regardless. + fn find_unnest_placeholder(expr: &Expr) -> Option { + let mut result = None; + let _ = expr.apply(|e| { + if let Some(t) = Self::classify_placeholder_column(e) { + result = Some(t); + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + }); + result + } + + /// Returns true if `expr` is a placeholder column, optionally wrapped + /// in a single alias (the rewriter's internal `UNNEST(...)` name). + /// Does NOT match when a user alias wraps the internal alias + /// (e.g. `Alias("c1", Alias("UNNEST(...)", Column(placeholder)))`), + /// so the table-factor path correctly falls through to + /// `reconstruct_select_statement` which preserves user aliases. + fn is_bare_unnest_placeholder(expr: &Expr) -> bool { + // Peel at most one alias layer (the rewriter's internal name). + let inner = match expr { + Expr::Alias(Alias { expr, .. }) => expr.as_ref(), + other => other, + }; + Self::classify_placeholder_column(inner).is_some() + } + + /// If `expr` is a `Column` whose name starts with `__unnest_placeholder`, + /// classify it as [`UnnestInputType::OuterReference`] or + /// [`UnnestInputType::Scalar`]. + fn classify_placeholder_column(expr: &Expr) -> Option { + if let Expr::Column(Column { name, .. }) = expr && let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) { if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) { @@ -1070,6 +1347,25 @@ impl Unparser<'_> { None } + /// Check whether an expression carries an internal `UNNEST(...)` display + /// name as its column name or outermost alias. After + /// [`unproject_unnest_expr_as_flatten_value`] rewrites the placeholder + /// column to `_unnest.VALUE`, the internal alias may still linger + /// (e.g. `Alias("UNNEST(make_array(...))", Column("_unnest.VALUE"))`). + /// Callers use this to replace the expression with a clean + /// `_unnest."VALUE"` select item. + fn has_internal_unnest_alias(expr: &Expr) -> bool { + match expr { + Expr::Column(col) => { + col.name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")) + } + Expr::Alias(Alias { name, .. }) => { + name.starts_with(&format!("{UNNEST_COLUMN_PREFIX}(")) + } + _ => false, + } + } + fn try_unnest_to_table_factor_sql( &self, unnest: &Unnest, @@ -1100,6 +1396,51 @@ impl Unparser<'_> { Ok(Some(unnest_relation)) } + /// Build a `SELECT alias."VALUE"` item for Snowflake FLATTEN output. + fn build_flatten_value_select_item( + &self, + flatten_alias: &str, + user_alias: Option<&str>, + ) -> ast::SelectItem { + let compound = ast::Expr::CompoundIdentifier(vec![ + self.new_ident_quoted_if_needs(flatten_alias.to_string()), + Ident::with_quote('"', "VALUE"), + ]); + match user_alias { + Some(alias) => ast::SelectItem::ExprWithAlias { + expr: compound, + alias: self.new_ident_quoted_if_needs(alias.to_string()), + }, + None => ast::SelectItem::UnnamedExpr(compound), + } + } + + /// Convert an `Unnest` logical plan node to a `LATERAL FLATTEN(INPUT => expr, ...)` + /// table factor for Snowflake-style SQL output. + fn try_unnest_to_lateral_flatten_sql( + &self, + unnest: &Unnest, + ) -> Result> { + let Some(projection) = Self::peel_to_inner_projection(unnest.input.as_ref()) + else { + return Ok(None); + }; + + // For now, handle the simple case of a single expression to flatten. + // Multi-expression would require multiple LATERAL FLATTEN calls chained together. + let Some(first_expr) = projection.expr.first() else { + return Ok(None); + }; + + let input_expr = self.expr_to_sql(first_expr)?; + + let mut flatten = FlattenRelationBuilder::default(); + flatten.input_expr(input_expr); + flatten.outer(unnest.options.preserve_nulls); + + Ok(Some(flatten)) + } + fn is_scan_with_pushdown(scan: &TableScan) -> bool { scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some() } diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index f539c0ddc1e87..3657516d534aa 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -22,8 +22,8 @@ use super::{ rewrite::TableAliasRewriter, }; use datafusion_common::{ - Column, DataFusionError, Result, ScalarValue, assert_eq_or_internal_err, - internal_err, + Column, DataFusionError, Result, ScalarValue, TableReference, + assert_eq_or_internal_err, internal_err, tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_expr::{ @@ -183,6 +183,32 @@ pub(crate) fn unproject_unnest_expr(expr: Expr, unnest: &Unnest) -> Result }).map(|e| e.data) } +/// Like `unproject_unnest_expr`, but for Snowflake FLATTEN: +/// transforms `__unnest_placeholder(...)` column references into +/// `Expr::Column(Column { relation: Some(alias), name: "VALUE" })`. +pub(crate) fn unproject_unnest_expr_as_flatten_value( + expr: Expr, + unnest: &Unnest, + flatten_alias: &str, +) -> Result { + expr.transform(|sub_expr| { + if let Expr::Column(col_ref) = &sub_expr + && unnest + .list_type_columns + .iter() + .any(|e| e.1.output_column.name == col_ref.name) + { + let value_col = Expr::Column(Column::new( + Some(TableReference::bare(flatten_alias)), + "VALUE", + )); + return Ok(Transformed::yes(value_col)); + } + Ok(Transformed::no(sub_expr)) + }) + .map(|e| e.data) +} + /// Recursively identify all Column expressions and transform them into the appropriate /// aggregate expression contained in agg. /// diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 923812138c016..db1924f9e11c7 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -26,8 +26,9 @@ use datafusion_expr::test::function_stub::{ count_udaf, max, max_udaf, min_udaf, sum, sum_udaf, }; use datafusion_expr::{ - EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union, - UserDefinedLogicalNode, UserDefinedLogicalNodeCore, WindowFrame, + ColumnarValue, EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, + ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Union, + UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Volatility, WindowFrame, WindowFunctionDefinition, cast, col, lit, table_scan, wildcard, }; use datafusion_functions::unicode; @@ -39,7 +40,7 @@ use datafusion_sql::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_sql::unparser::dialect::{ BigQueryDialect, CustomDialectBuilder, DefaultDialect as UnparserDefaultDialect, DefaultDialect, Dialect as UnparserDialect, MySqlDialect as UnparserMySqlDialect, - PostgreSqlDialect as UnparserPostgreSqlDialect, SqliteDialect, + PostgreSqlDialect as UnparserPostgreSqlDialect, SnowflakeDialect, SqliteDialect, }; use datafusion_sql::unparser::{Unparser, expr_to_sql, plan_to_sql}; use insta::assert_snapshot; @@ -2995,3 +2996,424 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { Ok(()) } + +#[test] +fn snowflake_unnest_to_lateral_flatten_simple() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST([1,2,3])", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "_unnest_1"."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_unnest_to_lateral_flatten_with_cross_join() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST([1,2,3]), j1", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "_unnest_1"."VALUE", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1" CROSS JOIN "j1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_unnest_to_lateral_flatten_cross_join_inline() -> Result<(), DataFusionError> +{ + // Cross join with two inline UNNEST sources — both produce valid FLATTEN. + // NOTE: UNNEST(table.column) is NOT tested with Snowflake because + // LATERAL FLATTEN(INPUT => col) requires the column to be a Snowflake + // VARIANT/ARRAY type, which cannot be validated at unparse time. + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST([1,2,3]) u(c1) JOIN j1 ON u.c1 = j1.j1_id", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "u"."c1", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "u" INNER JOIN "j1" ON ("u"."c1" = "j1"."j1_id")"#, + ); + Ok(()) +} + +// --- Edge case tests for Snowflake FLATTEN --- + +#[test] +fn snowflake_flatten_implicit_from() -> Result<(), DataFusionError> { + // UNNEST in SELECT clause (no explicit FROM UNNEST) — implicit table factor + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT UNNEST([1,2,3])", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "_unnest_1"."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_string_array() -> Result<(), DataFusionError> { + // String array unnest + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST(['a','b','c'])", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "_unnest_1"."VALUE" FROM LATERAL FLATTEN(INPUT => ['a', 'b', 'c']) AS "_unnest_1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_select_unnest_with_alias() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT UNNEST([1,2,3]) as c1", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "_unnest_1"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_select_unnest_plus_literal() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT UNNEST([1,2,3]), 1", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "_unnest_1"."VALUE", "Int64(1)" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "_unnest_1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_from_unnest_with_table_alias() -> Result<(), DataFusionError> { + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT * FROM UNNEST([1,2,3]) AS t1 (c1)", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "t1"."c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "t1""#, + ); + Ok(()) +} + +#[test] +fn snowflake_flatten_unnest_from_subselect() -> Result<(), DataFusionError> { + // UNNEST operating on an array column produced by a subselect. + // Uses unnest_table which has array_col (List). + // The filter uses array_col IS NOT NULL — a simple predicate + // that doesn't involve struct types (which Snowflake FLATTEN can't handle). + let snowflake = SnowflakeDialect::new(); + roundtrip_statement_with_dialect_helper!( + sql: "SELECT UNNEST(array_col) FROM (SELECT array_col FROM unnest_table WHERE array_col IS NOT NULL LIMIT 3)", + parser_dialect: GenericDialect {}, + unparser_dialect: snowflake, + expected: @r#"SELECT "_unnest_1"."VALUE" FROM (SELECT "unnest_table"."array_col" FROM "unnest_table" WHERE "unnest_table"."array_col" IS NOT NULL LIMIT 3) CROSS JOIN LATERAL FLATTEN(INPUT => "unnest_table"."array_col") AS "_unnest_1""#, + ); + Ok(()) +} + +/// Dummy scalar UDF for testing — takes a string and returns List. +/// Simulates any UDF that extracts an array from a column (e.g. parsing +/// JSON, splitting a delimited string, etc.). +#[derive(Debug, PartialEq, Eq, Hash)] +struct ExtractArrayUdf { + signature: Signature, +} + +impl ExtractArrayUdf { + fn new() -> Self { + Self { + signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for ExtractArrayUdf { + fn name(&self) -> &str { + "extract_array" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::List(Arc::new(Field::new_list_field( + DataType::Int64, + true, + )))) + } + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + unimplemented!("test stub") + } +} + +#[test] +fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> { + // UNNEST on a UDF result: extract_array(col) returns List, + // then UNNEST flattens it. This exercises the path where the FLATTEN + // INPUT is a UDF call rather than a bare column reference. + let sql = "SELECT UNNEST(extract_array(j1_string)) AS items FROM j1 LIMIT 5"; + + let statement = Parser::new(&GenericDialect {}) + .try_with_sql(sql)? + .parse_statement()?; + + let state = MockSessionState::default() + .with_aggregate_function(max_udaf()) + .with_aggregate_function(min_udaf()) + .with_scalar_function(Arc::new(ScalarUDF::new_from_impl(ExtractArrayUdf::new()))) + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())) + .with_expr_planner(Arc::new(NestedFunctionPlanner)) + .with_expr_planner(Arc::new(FieldAccessPlanner)); + + let context = MockContextProvider { state }; + let sql_to_rel = SqlToRel::new(&context); + let plan = sql_to_rel + .sql_statement_to_plan(statement) + .unwrap_or_else(|e| panic!("Failed to parse sql: {sql}\n{e}")); + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + insta::assert_snapshot!(actual, @r#"SELECT "_unnest_1"."VALUE" AS "items" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => extract_array("j1"."j1_string")) AS "_unnest_1" LIMIT 5"#); + Ok(()) +} + +#[test] +fn snowflake_flatten_limit_between_projection_and_unnest() -> Result<(), DataFusionError> +{ + // Build: Projection → Limit → Unnest → Projection → TableScan + // The optimizer can insert a Limit between the outer Projection and the + // Unnest. The FLATTEN code path must look through transparent nodes + // (Limit, Sort) to find the Unnest. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .limit(0, Some(5))? // Limit BETWEEN outer Projection and Unnest + .project(vec![col("__unnest_placeholder(items)").alias("item")])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + // Must contain LATERAL FLATTEN — the Limit must not prevent FLATTEN detection + insta::assert_snapshot!(actual, @r#"SELECT "_unnest_1"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" LIMIT 5"#); + Ok(()) +} + +#[test] +fn snowflake_flatten_sort_between_projection_and_unnest() -> Result<(), DataFusionError> { + // Build: Projection → Sort → Unnest → Projection → TableScan + // Same as Limit test but with Sort instead. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .sort(vec![col("__unnest_placeholder(items)").sort(true, true)])? + .project(vec![col("__unnest_placeholder(items)").alias("item")])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + // Must contain LATERAL FLATTEN — the Sort must not prevent FLATTEN detection + insta::assert_snapshot!(actual, @r#"SELECT "_unnest_1"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" ORDER BY "__unnest_placeholder(items)" ASC NULLS FIRST"#); + Ok(()) +} + +#[test] +fn snowflake_flatten_limit_between_projection_and_unnest_with_subquery_alias() +-> Result<(), DataFusionError> { + // Build: Projection → Limit → Unnest → SubqueryAlias → Projection → TableScan + // Combines the Limit and SubqueryAlias transparent node patterns. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .alias("t")? + .unnest_column("__unnest_placeholder(items)")? + .limit(0, Some(10))? + .project(vec![col("__unnest_placeholder(items)").alias("item")])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + insta::assert_snapshot!(actual, @r#"SELECT "_unnest_1"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" LIMIT 10"#); + Ok(()) +} + +#[test] +fn snowflake_flatten_composed_expression_wrapping_unnest() -> Result<(), DataFusionError> +{ + // Build: Projection(CAST(placeholder AS Int64) AS item_id) → Unnest → Projection → TableScan + // The outer Projection wraps the unnest output in a function call. + // The FLATTEN code path must detect the placeholder inside the function + // and still emit LATERAL FLATTEN. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .project(vec![ + cast(col("__unnest_placeholder(items)"), DataType::Int64).alias("item_id"), + ])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + // Must contain LATERAL FLATTEN despite the placeholder being inside CAST + insta::assert_snapshot!(actual, @r#"SELECT CAST("_unnest_1"."VALUE" AS BIGINT) AS "item_id" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1""#); + Ok(()) +} + +#[test] +fn snowflake_flatten_composed_expression_with_limit() -> Result<(), DataFusionError> { + // Combines both bugs: composed expression + Limit between Projection and Unnest + // Build: Projection(CAST(placeholder AS Int64) AS item_id) → Limit → Unnest → Projection → TableScan + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .limit(0, Some(5))? + .project(vec![ + cast(col("__unnest_placeholder(items)"), DataType::Int64).alias("item_id"), + ])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + insta::assert_snapshot!(actual, @r#"SELECT CAST("_unnest_1"."VALUE" AS BIGINT) AS "item_id" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" LIMIT 5"#); + Ok(()) +} + +#[test] +fn snowflake_flatten_multi_expression_projection() -> Result<(), DataFusionError> { + // Build: Projection([CAST(placeholder AS Int64) AS a, CAST(placeholder AS Utf8) AS b]) + // → Unnest → Projection → TableScan + // The outer Projection has TWO expressions — both reference the placeholder. + // The FLATTEN code path must fire even when p.expr.len() > 1. + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .project(vec![ + cast(col("__unnest_placeholder(items)"), DataType::Int64).alias("a"), + cast(col("__unnest_placeholder(items)"), DataType::Utf8).alias("b"), + ])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + insta::assert_snapshot!(actual, @r#"SELECT CAST("_unnest_1"."VALUE" AS BIGINT) AS "a", CAST("_unnest_1"."VALUE" AS VARCHAR) AS "b" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1""#); + Ok(()) +} + +#[test] +fn snowflake_flatten_multi_expression_with_limit() -> Result<(), DataFusionError> { + // Multi-expression + Limit between Projection and Unnest + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .unnest_column("__unnest_placeholder(items)")? + .limit(0, Some(10))? + .project(vec![ + cast(col("__unnest_placeholder(items)"), DataType::Int64).alias("a"), + cast(col("__unnest_placeholder(items)"), DataType::Utf8).alias("b"), + ])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let actual = result.to_string(); + + insta::assert_snapshot!(actual, @r#"SELECT CAST("_unnest_1"."VALUE" AS BIGINT) AS "a", CAST("_unnest_1"."VALUE" AS VARCHAR) AS "b" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1" LIMIT 10"#); + Ok(()) +} + +#[test] +fn snowflake_unnest_through_subquery_alias() -> Result<(), DataFusionError> { + // Build: Projection → Unnest → SubqueryAlias → Projection → TableScan + // This simulates the plan produced when a virtual/passthrough table + // wraps the source in a SubqueryAlias, which sits between the Unnest + // and its inner Projection. + + let schema = Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + )]); + + let plan = table_scan(Some("source"), &schema, None)? + .project(vec![col("items").alias("__unnest_placeholder(items)")])? + .alias("t")? // SubqueryAlias — this is what breaks + .unnest_column("__unnest_placeholder(items)")? + .project(vec![col("__unnest_placeholder(items)").alias("item")])? + .build()?; + + let snowflake = SnowflakeDialect::new(); + let unparser = Unparser::new(&snowflake); + let result = unparser.plan_to_sql(&plan)?; + let sql_str = result.to_string(); + + // Should contain LATERAL FLATTEN, not error + insta::assert_snapshot!(sql_str, @r#"SELECT "_unnest_1"."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS "_unnest_1""#); + Ok(()) +}