diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 9f770f9f45e1d..ca8dfa431b4f5 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -499,6 +499,17 @@ impl Unparser<'_> { ) } LogicalPlan::Sort(sort) => { + // Sort can be top-level plan for derived table + if select.already_projected() { + return self.derive_with_dialect_alias( + "derived_sort", + plan, + relation, + false, + vec![], + ); + } + let Some(query_ref) = query else { return internal_err!( "Sort operator only valid in a statement context." diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index ec1b17cd28a91..e3b644f33f3b5 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -223,7 +223,15 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( let mut collects = p.expr.clone(); for sort in &sort.expr { - collects.push(sort.expr.clone()); + // Strip aliases from sort expressions so the comparison matches + // the inner Projection's raw expressions. The optimizer may add + // sort expressions to the inner Projection without aliases, while + // the Sort node's expressions carry aliases from the original plan. + let mut expr = sort.expr.clone(); + while let Expr::Alias(alias) = expr { + expr = *alias.expr; + } + collects.push(expr); } // Compare outer collects Expr::to_string with inner collected transformed values diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 4717b843abb53..9ef345c3dca34 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -1740,6 +1740,42 @@ fn test_sort_with_push_down_fetch() -> Result<()> { Ok(()) } +#[test] +fn test_sort_with_scalar_fn_and_push_down_fetch() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("search_phrase", DataType::Utf8, false), + Field::new("event_time", DataType::Utf8, false), + ]); + + let substr_udf = unicode::substr(); + + // Build a plan that mimics the DF52 optimizer output: + // Projection(search_phrase) → Sort(substr(event_time), fetch=10) + // → Projection(search_phrase, event_time) → Filter → TableScan + // This triggers a subquery because the outer projection differs from the inner one. + // The ORDER BY scalar function must not reference the inner table qualifier. + let plan = table_scan(Some("t1"), &schema, None)? + .filter(col("search_phrase").not_eq(lit("")))? + .project(vec![col("search_phrase"), col("event_time")])? + .sort_with_limit( + vec![ + substr_udf + .call(vec![col("event_time"), lit(1), lit(5)]) + .sort(true, true), + ], + Some(10), + )? + .project(vec![col("search_phrase")])? + .build()?; + + let sql = plan_to_sql(&plan)?; + assert_snapshot!( + sql, + @"SELECT t1.search_phrase FROM (SELECT t1.search_phrase, t1.event_time FROM t1 WHERE (t1.search_phrase <> '') ORDER BY substr(t1.event_time, 1, 5) ASC NULLS FIRST LIMIT 10)" + ); + Ok(()) +} + #[test] fn test_join_with_table_scan_filters() -> Result<()> { let schema_left = Schema::new(vec![ @@ -1984,7 +2020,7 @@ fn test_complex_order_by_with_grouping() -> Result<()> { }, { assert_snapshot!( sql, - @r#"SELECT j1.j1_id, j1.j1_string, lochierarchy FROM (SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy, grouping(j1.j1_string), grouping(j1.j1_id) FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string)) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (("grouping(j1.j1_id)" + "grouping(j1.j1_string)") = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"# + @r#"SELECT j1.j1_id, j1.j1_string, lochierarchy FROM (SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy, grouping(j1.j1_string), grouping(j1.j1_id) FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN ((grouping(j1.j1_id) + grouping(j1.j1_string)) = 0) THEN j1.j1_id END ASC NULLS LAST) LIMIT 100"# ); });