From 0203bd186a77149afd86e4e3764696952dbc9e61 Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Wed, 22 Apr 2026 19:58:05 +0800 Subject: [PATCH] [FLINK-38397][table] Fix stale sort trait on BatchPhysicalLocalHashAggregate causing ArrayIndexOutOfBoundsException --- .../batch/BatchPhysicalHashAggRule.scala | 6 ++++-- .../planner/plan/batch/table/AggregateTest.xml | 13 +++++++++++++ .../planner/plan/batch/table/AggregateTest.scala | 16 ++++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala index 355c1d0779a09..cb4e0df820eeb 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala @@ -30,7 +30,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils.isOperatorDisabled import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.{RelCollations, RelNode} import scala.collection.JavaConversions._ @@ -101,7 +101,9 @@ class BatchPhysicalHashAggRule // create BatchPhysicalLocalHashAggregate val localRequiredTraitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) val newInput = RelOptRule.convert(input, localRequiredTraitSet) - val providedTraitSet = localRequiredTraitSet + // A hash aggregate does not preserve input ordering; reset the collation to EMPTY + // so the local agg does not advertise a stale collation from its input. + val providedTraitSet = localRequiredTraitSet.replace(RelCollations.EMPTY) val localHashAgg = createLocalAgg( agg.getCluster, providedTraitSet, diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/AggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/AggregateTest.xml index 6f6ce9785d010..0c5c22da4a7d3 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/AggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/AggregateTest.xml @@ -71,6 +71,19 @@ HashAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final +- LocalHashAggregate(select=[Partial_AVG(a) AS (sum$0, count$1), Partial_SUM(b) AS sum$2, Partial_COUNT(c) AS count$3, Partial_SUM($f3) AS sum$4]) +- Calc(select=[CAST(1 AS INTEGER) AS a, b, c, c._1 AS $f3], where=[(a = 1)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) +]]> + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/AggregateTest.scala index 561eb11e1ba13..cd10ad3b70dc3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/AggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/AggregateTest.scala @@ -73,4 +73,20 @@ class AggregateTest extends TableTestBase { util.verifyExecPlan(resultTable) } + + @Test + def testGroupAggregateAfterOrderBy(): Unit = { + // order_by before a group_by that prunes the sort-key column must not + // leave a stale collation on the local hash aggregate. + val util = batchTestUtil() + val sourceTable = util + .addTableSource[(Int, Long, Int, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'e) + + val resultTable = sourceTable + .orderBy('c) + .groupBy('d) + .select('e.count.as('cnt)) + + util.verifyExecPlan(resultTable) + } }