[SPARK-46367][SQL] Support narrowing projection of KeyedPartitioning in PartitioningPreservingUnaryExecNode#55519
Draft
peter-toth wants to merge 1 commit intoapache:masterfrom
Conversation
433d560 to
0b3e7bc
Compare
0b3e7bc to
8f8efaa
Compare
Contributor
Author
|
I will rebase this PR once #55538 is merged. |
…in PartitioningPreservingUnaryExecNode ### What changes were proposed in this pull request? When a `KeyedPartitioning` passes through a `PartitioningPreservingUnaryExecNode` (e.g. `ProjectExec`), the previous implementation projected the partitioning as a whole expression via `multiTransformDown`. If any expression position could not be mapped to an output attribute, the entire `KeyedPartitioning` was silently dropped, resulting in `UnknownPartitioning`. This PR replaces that approach with a per-position projection algorithm implemented in two new private helpers (`projectKeyedPartitionings` and `projectOtherPartitionings`), with the main `outputPartitioning` reduced to a simple split, project, and combine: 1. For each expression position (0..N-1), collect the unique expressions at that position across all input `KeyedPartitioning`s (using `ExpressionSet` to deduplicate semantically equal expressions), then project each through the output aliases via `projectExpression`. 2. Positions with at least one projected alternative are *projectable*; they define the maximum achievable granularity. Positions that cannot be expressed in the output are dropped (narrowing). 3. The shared `partitionKeys` are projected to the subset of projectable positions via `KeyedPartitioning.projectKeys`. 4. The final `KeyedPartitioning`s are the cross-product of per-position alternatives, computed lazily via `MultiTransform.generateCartesianProduct`, deduplicated, and bounded by a single outer `take(aliasCandidateLimit)`. All resulting `KeyedPartitioning`s at the same granularity share the same `partitionKeys` object, preserving the invariant required by `GroupPartitionsExec`. A new `isNarrowed: Boolean` flag is added to `KeyedPartitioning` and set to `true` when the projection drops one or more key positions. When `isNarrowed=true` and `isGrouped=false`, `GroupPartitionsExec` would merge original partitions that held distinct keys, carrying the same data-skew risk as `allowJoinKeysSubsetOfPartitionKeys`. `groupedSatisfies` therefore gates such narrowed partitionings behind that config. When `isGrouped=true` after narrowing, the projected keys are still distinct so no merging happens and no config is required. ### Why are the changes needed? Without this fix, a `ProjectExec` that drops any column of a multi-column partition key causes the entire `KeyedPartitioning` to be lost. This breaks storage-partitioned join optimisations (SPJ) that rely on the partitioning surviving projection (e.g. a subquery that renames or projects away a partition key column). ### Does this PR introduce _any_ user-facing change? Yes. SPJ is now preserved through `ProjectExec` nodes: - Alias projections (e.g. `SELECT id AS pk FROM t`) no longer break SPJ. - Narrowing projections (e.g. `SELECT id FROM t` where `t` is partitioned by `(id, name)`) enable SPJ when the projected keys remain distinct, or when `spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys` is enabled and the keys become non-unique. ### How was this patch tested? Unit tests added/updated in `ProjectedOrderingAndPartitioningSuite`: - Full-granularity alias substitution - 2->1 and 3->2 narrowing with and without aliases - `PartitioningCollection` with mixed projectability - `isNarrowed=true, isGrouped=false`: `groupedSatisfies` blocked without config, allowed with `allowJoinKeysSubsetOfPartitionKeys` - `isNarrowed=true, isGrouped=true`: `satisfies` succeeds without config End-to-end tests added in `KeyGroupedPartitioningSuite`: - Alias in subquery does not break SPJ - Narrowing projection with duplicate projected keys requires `allowJoinKeysSubsetOfPartitionKeys` - Narrowing projection with distinct projected keys triggers SPJ without config ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6
8f8efaa to
397a8be
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
When a
KeyedPartitioningpasses through aPartitioningPreservingUnaryExecNode(e.g.ProjectExec), the previous implementation projected the partitioning as a whole expression viamultiTransformDown. If any expression position could not be mapped to an output attribute, the entireKeyedPartitioningwas silently dropped, resulting inUnknownPartitioning.This PR replaces that approach with a per-position projection algorithm implemented in two new private helpers (
projectKeyedPartitioningsandprojectOtherPartitionings), with the mainoutputPartitioningreduced to a simple split, project, and combine:KeyedPartitionings (usingExpressionSetto deduplicate semantically equal expressions), then project each through the output aliases viaprojectExpression.partitionKeysare projected to the subset of projectable positions viaKeyedPartitioning.projectKeys.KeyedPartitionings are the cross-product of per-position alternatives, computed lazily viaMultiTransform.generateCartesianProduct, deduplicated, and bounded by a single outertake(aliasCandidateLimit).All resulting
KeyedPartitionings at the same granularity share the samepartitionKeysobject, preserving the invariant required byGroupPartitionsExec.A new
isNarrowed: Booleanflag is added toKeyedPartitioningand set totruewhen the projection drops one or more key positions. WhenisNarrowed=trueandisGrouped=false,GroupPartitionsExecwould merge original partitions that held distinct keys, carrying the same data-skew risk asallowJoinKeysSubsetOfPartitionKeys.groupedSatisfiestherefore gates such narrowed partitionings behind that config. WhenisGrouped=trueafter narrowing, the projected keys are still distinct so no merging happens and no config is required.Why are the changes needed?
Without this fix, a
ProjectExecthat drops any column of a multi-column partition key causes the entireKeyedPartitioningto be lost. This breaks storage-partitioned join optimisations (SPJ) that rely on the partitioning surviving projection (e.g. a subquery that renames or projects away a partition key column).Does this PR introduce any user-facing change?
Yes. SPJ is now preserved through
ProjectExecnodes:SELECT id AS pk FROM t) no longer break SPJ.SELECT id FROM twheretis partitioned by(id, name)) enable SPJ when the projected keys remain distinct, or whenspark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeysis enabled and the keys become non-unique.How was this patch tested?
Unit tests added/updated in
ProjectedOrderingAndPartitioningSuite:PartitioningCollectionwith mixed projectabilityisNarrowed=true, isGrouped=false:groupedSatisfiesblocked without config, allowed withallowJoinKeysSubsetOfPartitionKeysisNarrowed=true, isGrouped=true:satisfiessucceeds without configEnd-to-end tests added in
KeyGroupedPartitioningSuite:allowJoinKeysSubsetOfPartitionKeysWas this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6