From 397a8bed33b93891383a64ff5ab5afa08ced97bb Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 23 Apr 2026 10:03:31 +0200 Subject: [PATCH] [SPARK-46367][SQL] Support narrowing projection of KeyedPartitioning in PartitioningPreservingUnaryExecNode ### What changes were proposed in this pull request? When a `KeyedPartitioning` passes through a `PartitioningPreservingUnaryExecNode` (e.g. `ProjectExec`), the previous implementation projected the partitioning as a whole expression via `multiTransformDown`. If any expression position could not be mapped to an output attribute, the entire `KeyedPartitioning` was silently dropped, resulting in `UnknownPartitioning`. This PR replaces that approach with a per-position projection algorithm implemented in two new private helpers (`projectKeyedPartitionings` and `projectOtherPartitionings`), with the main `outputPartitioning` reduced to a simple split, project, and combine: 1. For each expression position (0..N-1), collect the unique expressions at that position across all input `KeyedPartitioning`s (using `ExpressionSet` to deduplicate semantically equal expressions), then project each through the output aliases via `projectExpression`. 2. Positions with at least one projected alternative are *projectable*; they define the maximum achievable granularity. Positions that cannot be expressed in the output are dropped (narrowing). 3. The shared `partitionKeys` are projected to the subset of projectable positions via `KeyedPartitioning.projectKeys`. 4. The final `KeyedPartitioning`s are the cross-product of per-position alternatives, computed lazily via `MultiTransform.generateCartesianProduct`, deduplicated, and bounded by a single outer `take(aliasCandidateLimit)`. All resulting `KeyedPartitioning`s at the same granularity share the same `partitionKeys` object, preserving the invariant required by `GroupPartitionsExec`. A new `isNarrowed: Boolean` flag is added to `KeyedPartitioning` and set to `true` when the projection drops one or more key positions. When `isNarrowed=true` and `isGrouped=false`, `GroupPartitionsExec` would merge original partitions that held distinct keys, carrying the same data-skew risk as `allowJoinKeysSubsetOfPartitionKeys`. `groupedSatisfies` therefore gates such narrowed partitionings behind that config. When `isGrouped=true` after narrowing, the projected keys are still distinct so no merging happens and no config is required. ### Why are the changes needed? Without this fix, a `ProjectExec` that drops any column of a multi-column partition key causes the entire `KeyedPartitioning` to be lost. This breaks storage-partitioned join optimisations (SPJ) that rely on the partitioning surviving projection (e.g. a subquery that renames or projects away a partition key column). ### Does this PR introduce _any_ user-facing change? Yes. SPJ is now preserved through `ProjectExec` nodes: - Alias projections (e.g. `SELECT id AS pk FROM t`) no longer break SPJ. - Narrowing projections (e.g. `SELECT id FROM t` where `t` is partitioned by `(id, name)`) enable SPJ when the projected keys remain distinct, or when `spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys` is enabled and the keys become non-unique. ### How was this patch tested? Unit tests added/updated in `ProjectedOrderingAndPartitioningSuite`: - Full-granularity alias substitution - 2->1 and 3->2 narrowing with and without aliases - `PartitioningCollection` with mixed projectability - `isNarrowed=true, isGrouped=false`: `groupedSatisfies` blocked without config, allowed with `allowJoinKeysSubsetOfPartitionKeys` - `isNarrowed=true, isGrouped=true`: `satisfies` succeeds without config End-to-end tests added in `KeyGroupedPartitioningSuite`: - Alias in subquery does not break SPJ - Narrowing projection with duplicate projected keys requires `allowJoinKeysSubsetOfPartitionKeys` - Narrowing projection with distinct projected keys triggers SPJ without config ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 --- .../plans/physical/partitioning.scala | 44 ++-- .../AliasAwareOutputExpression.scala | 114 ++++++++-- .../exchange/EnsureRequirements.scala | 4 +- .../exchange/ShuffleExchangeExec.scala | 2 +- .../DistributionAndOrderingSuiteBase.scala | 2 +- .../KeyGroupedPartitioningSuite.scala | 121 +++++++++++ ...rojectedOrderingAndPartitioningSuite.scala | 203 +++++++++++++++++- .../exchange/EnsureRequirementsSuite.scala | 2 +- 8 files changed, 455 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 6fb2ae3821e09..1f820bb86a06c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -353,16 +353,18 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa * `KeyedPartitioning` is used in two distinct forms: * * 1. '''As outputPartitioning''': When used as a node's output partitioning (e.g., in - * `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are always in sorted order. - * This is how leaf data source nodes produce partition keys originally, and this ordering is - * preserved through `GroupPartitionsExec`. The sorted order is critical for storage-partitioned - * join compatibility. + * `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are typically in sorted order + * because data sources produce them that way and `GroupPartitionsExec` sorts while grouping. + * Sorted order is not a hard requirement, but it is a useful property: when both sides of a + * storage-partitioned join report sorted keys, `EnsureRequirements` can often match them + * without inserting an additional `GroupPartitionsExec`. After a narrowing projection through + * `PartitioningPreservingUnaryExecNode`, the projected keys may no longer be sorted; this is + * acceptable because `EnsureRequirements` can always reconcile both sides via + * `GroupPartitionsExec` with `expectedPartitionKeys`. * - * 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not be - * in sorted order. This occurs because `KeyedShuffleSpec` can project the partition keys by join - * key positions. The `EnsureRequirements` rule ensures that either the unordered keys from both - * sides of a join match exactly, or it builds a common ordered set of keys and pushes them down - * to `GroupPartitionsExec` on both sides to establish a compatible ordering. + * 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not + * be in sorted order. `EnsureRequirements` handles this by building a common ordered set of + * keys and pushing them down to `GroupPartitionsExec` on both sides. * * == Partition Keys == * - `partitionKeys`: The partition keys, one per partition. May contain duplicates initially @@ -417,16 +419,23 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa * * @param expressions Partition transform expressions (e.g., `years(col)`, `bucket(10, col)`). * @param partitionKeys Partition keys wrapped in InternalRowComparableWrapper for efficient - * comparison and grouping. One per partition. When used as outputPartitioning, - * always in sorted order. When used in `KeyedShuffleSpec`, may be unsorted - * after projection. May contain duplicates when ungrouped. + * comparison and grouping. One per partition. Typically in sorted order when + * produced by a data source or `GroupPartitionsExec`, but this is not + * guaranteed after projection. May contain duplicates when ungrouped. * @param isGrouped Whether partition keys are unique (no duplicates). Computed on first * creation, then preserved through copy operations to avoid recomputation. + * @param isNarrowed Whether this partitioning was derived from a finer-grained one by dropping + * key positions (e.g. via `PartitioningPreservingUnaryExecNode`). When true, + * `GroupPartitionsExec` will merge partitions that shared distinct keys in the + * original partitioning, carrying the same skew risk as + * `allowJoinKeysSubsetOfPartitionKeys`. Such a partitioning will not satisfy + * `ClusteredDistribution` unless that config is enabled. */ case class KeyedPartitioning( expressions: Seq[Expression], @transient partitionKeys: Seq[InternalRowComparableWrapper], - isGrouped: Boolean) extends Expression with Partitioning with Unevaluable { + isGrouped: Boolean, + isNarrowed: Boolean = false) extends Expression with Partitioning with Unevaluable { override val numPartitions = partitionKeys.length override def children: Seq[Expression] = expressions @@ -480,13 +489,20 @@ case class KeyedPartitioning( c.areAllClusterKeysMatched(expressions) } else { // We'll need to find leaf attributes from the partition expressions first. - val attributes = expressions.flatMap(_.collectLeaves()) + lazy val attributes = expressions.flatMap(_.collectLeaves()) if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) { // check that join keys (required clustering keys) // overlap with partition keys (KeyedPartitioning attributes) requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) && expressions.forall(_.collectLeaves().size == 1) + } else if (isNarrowed && !isGrouped) { + // A narrowed, non-grouped partitioning carries the same skew risk as using a subset + // of partition keys for a join: GroupPartitionsExec will merge partitions that held + // distinct keys in the original finer-grained partitioning. Require the same config + // to opt in. (When isGrouped=true the projected keys are already distinct, so no + // merging happens and there is no skew risk.) + false } else { attributes.forall(x => requiredClustering.exists(_.semanticEquals(x))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 3b847b5852b13..872c1197ef950 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.execution import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet} import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.catalyst.trees.MultiTransform /** * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that @@ -29,35 +30,116 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { - val partitionings: Seq[Partitioning] = if (hasAlias) { - flattenPartitioning(child.outputPartitioning).iterator.flatMap { + val (keyedPartitionings, otherPartitionings) = + flattenPartitioning(child.outputPartitioning).partition(_.isInstanceOf[KeyedPartitioning]) + + val projectedKPs = + projectKeyedPartitionings(keyedPartitionings.map(_.asInstanceOf[KeyedPartitioning])) + val projectedOthers = projectOtherPartitionings(otherPartitionings) + + (projectedKPs ++ projectedOthers).take(aliasCandidateLimit) match { + case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) + case Seq(p) => p + case ps => PartitioningCollection(ps) + } + } + + /** + * Projects non-[[KeyedPartitioning]] partitionings through the current node's output expressions. + * + * With aliases, each partitioning expression is substituted with all possible alias combinations; + * without aliases, partitionings whose expressions reference attributes outside the output are + * dropped. + */ + private def projectOtherPartitionings( + partitionings: Seq[Partitioning]): LazyList[Partitioning] = { + if (hasAlias) { + partitionings.to(LazyList).flatMap { case e: Expression => // We need unique partitionings but if the input partitioning is - // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after - // the projection we have 4 partitionings: + // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then + // after the projection we have 4 partitionings: // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`, // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but // `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. val partitioningSet = mutable.Set.empty[Expression] projectExpression(e) .filter(e => partitioningSet.add(e.canonicalized)) - .take(aliasCandidateLimit) .asInstanceOf[LazyList[Partitioning]] - case o => Seq(o) - }.take(aliasCandidateLimit).toSeq + case o => LazyList(o) + } } else { - // Filter valid partitiongs (only reference output attributes of the current plan node) + // Filter valid partitionings (only reference output attributes of the current plan node) val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) - flattenPartitioning(child.outputPartitioning).filter { + partitionings.to(LazyList).filter { case e: Expression => e.references.subsetOf(outputSet) case _ => true } } - partitionings match { - case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) - case Seq(p) => p - case ps => PartitioningCollection(ps) - } + } + + /** + * Projects all input [[KeyedPartitioning]]s through the current node's output expressions. + * + * For each expression position (0..N-1), collects the unique expressions at that position across + * all input KPs, projects each through the output aliases, and unions the alternatives. + * Positions with at least one alternative are projectable; their count determines the maximum + * achievable granularity. Positions that cannot be expressed in the output are dropped. + * + * The resulting [[KeyedPartitioning]]s are the cross-product of the per-position alternatives + * restricted to the projectable positions. All share the same `partitionKeys` object (projected + * to the same subset of positions), preserving the invariant required by [[GroupPartitionsExec]]. + */ + private def projectKeyedPartitionings( + kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = { + if (kps.isEmpty) return LazyList.empty + val numPositions = kps.head.expressions.length + + val alternativesPerPosition: IndexedSeq[LazyList[Expression]] = + if (hasAlias) { + // For each position, gather unique expressions across all KPs (ExpressionSet deduplicates + // semantically equal expressions, e.g. the same join-key column shared by both KP sides) + // and project each through the output aliases. + (0 until numPositions).map { i => + val seen = mutable.Set.empty[Expression] + ExpressionSet(kps.map(_.expressions(i))).to(LazyList).flatMap { expr => + projectExpression(expr).filter(e => seen.add(e.canonicalized)) + } + } + } else { + // No aliases: filter out non-projectable expressions first, then deduplicate. + val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) + (0 until numPositions).map { i => + ExpressionSet(kps.collect { + case kp if kp.expressions(i).references.subsetOf(outputSet) => kp.expressions(i) + }).to(LazyList) + } + } + + // Non-empty positions define the maximum achievable granularity. + val projectablePositions = + (0 until numPositions).filter(i => alternativesPerPosition(i).nonEmpty) + + if (projectablePositions.isEmpty) return LazyList.empty + + // All input KPs share the same partitionKeys by invariant; use the first as the key source. + val keySource = kps.head + val sharedKeys = + if (projectablePositions.length == numPositions) keySource.partitionKeys + else keySource.projectKeys(projectablePositions)._2 + + val isGrouped = sharedKeys.distinct.size == sharedKeys.size + val isNarrowed = projectablePositions.length < numPositions + + // Cross-product the per-position alternatives to produce all concrete KPs. + // Note: generateCartesianProduct expects thunks () => Seq[T], but wrapping LazyLists in thunks + // here is not strictly necessary since they are already lazy -- we do it only to match the API. + // No deduplication is needed here: per-position alternatives are already canonically distinct, + // so all cross-product combinations are distinct by construction. + MultiTransform.generateCartesianProduct( + projectablePositions.map(i => () => alternativesPerPosition(i))) + .map(projectedExprs => + new KeyedPartitioning(projectedExprs, sharedKeys, isGrouped, isNarrowed)) } private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 1fff6c22c5ad8..0c5d54a71ad0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -372,12 +372,12 @@ case class EnsureRequirements( reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys) .orElse(reorderJoinKeysRecursively( leftKeys, rightKeys, leftPartitioning, None)) - case (Some(KeyedPartitioning(clustering, _, _)), _) => + case (Some(KeyedPartitioning(clustering, _, _, _)), _) => val leafExprs = clustering.flatMap(_.collectLeaves()) reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leafExprs, leftKeys) .orElse(reorderJoinKeysRecursively( leftKeys, rightKeys, None, rightPartitioning)) - case (_, Some(KeyedPartitioning(clustering, _, _))) => + case (_, Some(KeyedPartitioning(clustering, _, _, _))) => val leafExprs = clustering.flatMap(_.collectLeaves()) reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leafExprs, rightKeys) .orElse(reorderJoinKeysRecursively( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 7dcbf3779b93d..338728996c3ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -402,7 +402,7 @@ object ShuffleExchangeExec { val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) row => projection(row) case SinglePartition => identity - case KeyedPartitioning(expressions, _, _) => + case KeyedPartitioning(expressions, _, _, _) => row => bindReferences(expressions, outputAttributes).map(_.eval(row)) case s: ShufflePartitionIdPassThrough => // For ShufflePartitionIdPassThrough, the expression directly evaluates to the partition ID diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DistributionAndOrderingSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DistributionAndOrderingSuiteBase.scala index d5825432ebbf1..0273a5d6dd494 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DistributionAndOrderingSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DistributionAndOrderingSuiteBase.scala @@ -50,7 +50,7 @@ abstract class DistributionAndOrderingSuiteBase plan: QueryPlan[T]): Partitioning = partitioning match { case HashPartitioning(exprs, numPartitions) => HashPartitioning(exprs.map(resolveAttrs(_, plan)), numPartitions) - case KeyedPartitioning(expressions, partitionKeys, isGrouped) => + case KeyedPartitioning(expressions, partitionKeys, isGrouped, _) => KeyedPartitioning(expressions.map(resolveAttrs(_, plan)), partitionKeys, isGrouped) case PartitioningCollection(partitionings) => PartitioningCollection(partitionings.map(resolvePartitioning(_, plan))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 4a406322a5a19..e562deb5734ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -3913,4 +3913,125 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with } } } + + test("SPARK-46367: SPJ: partition key alias in subquery projects KeyedPartitioning") { + // A subquery that renames a partition key (id -> pk) creates a ProjectExec between the scan and + // the join. This test verifies that KeyedPartitioning expressions are correctly projected + // through aliases so that SPJ still works without a shuffle. Both sides have the same partition + // key sequence so no GroupPartitionsExec is needed. + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") + + val df = sql( + s""" + |${selectWithMergeJoinHint("sub", "p")} + |sub.pk, p.price AS purchase_price + |FROM (SELECT id AS pk FROM testcat.ns.$items) sub + |JOIN testcat.ns.$purchases p + |ON sub.pk = p.item_id + |ORDER BY pk, purchase_price + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not add shuffle when partition key is aliased in subquery") + + checkAnswer(df, Seq(Row(1, 42.0f), Row(2, 11.0f), Row(3, 19.5f))) + } + + test("SPARK-46367: SPJ: narrowing projection requires allowJoinKeysSubsetOfPartitionKeys") { + // items is partitioned by (id, name). The subquery projects away 'name', narrowing + // KeyedPartitioning([id, name]) -> KeyedPartitioning([id]) with isNarrowed=true. + // Because id=1 maps to two original partitions ("aa" and "bb"), isGrouped=false. + // GroupPartitionsExec would merge them, carrying the same skew risk as subset partition + // keys -- so SPJ requires allowJoinKeysSubsetOfPartitionKeys to be enabled. + val items_partitions = Array(identity("id"), identity("name")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'bb', 41.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'cc', 10.0, cast('2020-01-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp))") + + Seq(true, false).foreach { allowSubset => + withSQLConf( + SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowSubset.toString) { + + val df = sql( + s""" + |${selectWithMergeJoinHint("sub", "p")} + |sub.id, p.price AS purchase_price + |FROM (SELECT id FROM testcat.ns.$items WHERE name >= 'aa') sub + |JOIN testcat.ns.$purchases p + |ON sub.id = p.item_id + |ORDER BY sub.id, purchase_price + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + if (allowSubset) { + assert(shuffles.isEmpty, "SPJ should be triggered with config enabled") + } else { + assert(shuffles.nonEmpty, "SPJ should not be triggered without config") + } + + checkAnswer(df, Seq(Row(1, 42.0f), Row(1, 42.0f), Row(2, 11.0f))) + } + } + } + + test("SPARK-46367: SPJ: narrowing projection with distinct projected keys does not require " + + "allowJoinKeysSubsetOfPartitionKeys") { + // items is partitioned by (id, name) but each id value is unique, so projecting away 'name' + // produces KeyedPartitioning([id]) with isNarrowed=true but isGrouped=true. + // Because no two original partitions share the same projected key, GroupPartitionsExec does not + // merge any partitions -- no skew risk -- so SPJ works without config. + val items_partitions = Array(identity("id"), identity("name")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") + + withSQLConf( + SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "false") { + val df = sql( + s""" + |${selectWithMergeJoinHint("sub", "p")} + |sub.id, p.price AS purchase_price + |FROM (SELECT id FROM testcat.ns.$items WHERE name >= 'aa') sub + |JOIN testcat.ns.$purchases p + |ON sub.id = p.item_id + |ORDER BY sub.id, purchase_price + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, + "should not add shuffle: narrowed KP remains grouped so no skew risk") + + checkAnswer(df, Seq(Row(1, 42.0f), Row(2, 11.0f), Row(3, 19.5f))) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index ec13d48d45f84..9ff17166d3812 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashPartitioning, KeyedPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{IntegerType, StringType} class ProjectedOrderingAndPartitioningSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { @@ -210,6 +210,205 @@ class ProjectedOrderingAndPartitioningSuite assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a") assert(outputOrdering.head.sameOrderExpressions.size == 0) } + + test("SPARK-46367: KeyedPartitioning expressions are projected through " + + "PartitioningPreservingUnaryExecNode") { + val a = AttributeReference("a", IntegerType)() + val partitionKeys = Seq(InternalRow(1), InternalRow(2), InternalRow(3)) + val child = DummyLeafExecWithPartitioning( + output = Seq(a), + partitioning = KeyedPartitioning(Seq(a), partitionKeys)) + val b = Alias(a, "b")() + val project = ProjectExec(Seq(b), child) + + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.expressions === Seq(b.toAttribute), + "expressions must reference the aliased attribute, not the original") + assert(kp.partitionKeys === + child.partitioning.asInstanceOf[KeyedPartitioning].partitionKeys, + "partition keys must be preserved after projection") + case other => + fail(s"Expected KeyedPartitioning, got $other") + } + } + + test("SPARK-46367: narrowing projection on KeyedPartitioning produces projected partition keys") { + // KP([x, y], [(1,1),(1,2),(2,1),(2,2)]) through Project(x) should produce + // KP([x], [(1),(1),(2),(2)]) -- granularity narrows from 2 to 1. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), InternalRow(2, 2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2d)) + val project = ProjectExec(Seq(x), child) + + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.expressions === Seq(x), + "narrowed partitioning must keep the projected expression") + assert(kp.numPartitions === 4, + "partition count must be preserved") + case other => + fail(s"Expected KeyedPartitioning, got $other") + } + } + + test("SPARK-46367: narrowing projection with alias shares partition keys across alternatives") { + // KP([x, y], ...) through Project(x, x as x_alias) should produce + // PC(KP([x], keys1d), KP([x_alias], keys1d)) where both KPs reference the same keys1d object. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), InternalRow(2, 2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2d)) + val xAlias = Alias(x, "x_alias")() + val project = ProjectExec(Seq(x, xAlias), child) + + project.outputPartitioning match { + case pc: PartitioningCollection => + val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning]) + assert(kps.forall(_.expressions.length == 1), + "all narrowed KPs must have 1 expression") + assert(kps.map(_.expressions.head.asInstanceOf[Attribute].name).toSet + === Set("x", "x_alias"), + "both the original and aliased attribute must appear") + // The invariant: all KPs in the collection must share the same partitionKeys object. + assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys), + "all KPs must share the same partitionKeys object") + case other => + fail(s"Expected PartitioningCollection, got $other") + } + } + + test("SPARK-46367: narrowing projection from 3 to 2 expressions with alias") { + // KP([x, y, z], keys3d) through Project(x, x as x_alias, y) -- z is dropped. + // Expected: PC(KP([x, y], keys2d), KP([x_alias, y], keys2d)) where both share keys2d. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val z = AttributeReference("z", IntegerType)() + val keys3d = Seq(InternalRow(1, 1, 1), InternalRow(1, 1, 2), InternalRow(1, 2, 1), + InternalRow(2, 1, 1), InternalRow(2, 2, 2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y, z), + partitioning = KeyedPartitioning(Seq(x, y, z), keys3d)) + val xAlias = Alias(x, "x_alias")() + val project = ProjectExec(Seq(x, xAlias, y), child) + + project.outputPartitioning match { + case pc: PartitioningCollection => + val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning]) + assert(kps.forall(_.expressions.length == 2), + "narrowed KPs must have 2 expressions (z dropped, x and y kept)") + assert(kps.map(_.expressions.map(_.asInstanceOf[Attribute].name)).toSet === + Set(Seq("x", "y"), Seq("x_alias", "y"))) + assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys), + "all narrowed KPs must share the same partitionKeys object") + case other => + fail(s"Expected PartitioningCollection, got $other") + } + } + + test("SPARK-46367: PartitioningCollection KPs with mixed projectability produce " + + "correct per-position cross-product") { + // PC(KP([x,y], keys2d), KP([x,y_alias], keys2d)) through Project(x, x as x_alias, y_alias): + // Per-position projection across both KPs: + // position 0: ExpressionSet({x, x}) = {x} => projectExpression(x) = [x, x_alias] + // position 1: ExpressionSet({y, y_alias}) => projectExpression(y) = [] (y not in output), + // projectExpression(y_alias) = [y_alias] + // => alternatives: [y_alias] + // Cross-product [x, x_alias] x [y_alias] => KP([x,y_alias], keys2d), KP([x_alias,y_alias], + // keys2d). Both share the same keys2d object. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val yAlias = AttributeReference("y_alias", IntegerType)() + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), InternalRow(2, 2)) + val childPartitioning = PartitioningCollection(Seq( + KeyedPartitioning(Seq(x, y), keys2d), + KeyedPartitioning(Seq(x, yAlias), keys2d))) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y, yAlias), partitioning = childPartitioning) + val xAlias = Alias(x, "x_alias")() + val project = ProjectExec(Seq(x, xAlias, yAlias), child) + + project.outputPartitioning match { + case pc: PartitioningCollection => + val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning]) + assert(kps.forall(_.expressions.length == 2), + "only full-granularity (2-expr) results must be returned; narrowed ones are subsumed") + assert(kps.map(_.expressions.map(_.asInstanceOf[Attribute].name)).toSet === + Set(Seq("x", "y_alias"), Seq("x_alias", "y_alias")), + "both x/y_alias and x_alias/y_alias projections must appear") + // The invariant: all KPs must share the same partitionKeys object. + assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys), + "all KPs must share the same partitionKeys object") + case other => + fail(s"Expected PartitioningCollection, got $other") + } + } + + test("SPARK-46367: narrowing projection with duplicate keys requires " + + "allowJoinKeysSubsetOfPartitionKeys to satisfy ClusteredDistribution") { + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + + // Scenario 1: projected keys have duplicates (x-values: 1, 1, 2) -> isGrouped=false. + // GroupPartitionsExec would merge the two x=1 partitions, carrying the same skew risk as + // allowJoinKeysSubsetOfPartitionKeys. EnsureRequirements calls groupedSatisfies() directly. + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1)) + val project = ProjectExec(Seq(x), + DummyLeafExecWithPartitioning(output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2d))) + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "false") { + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(!kp.isGrouped, "narrowed keys must have duplicates (1 appears twice)") + assert(kp.isNarrowed, "projection must mark the KP as narrowed") + assert(!kp.groupedSatisfies(ClusteredDistribution(Seq(x))), + "narrowed ungrouped KP must not satisfy via groupedSatisfies without config") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.groupedSatisfies(ClusteredDistribution(Seq(x))), + "narrowed ungrouped KP must satisfy via groupedSatisfies when config is enabled") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + + // Scenario 2: projected keys are distinct (x-values: 1, 2, 3) -> isGrouped=true. + // Each projected key maps to exactly one original partition so GroupPartitionsExec does not + // merge any partitions. No skew risk: must satisfy ClusteredDistribution regardless of config. + val keys2dDistinct = Seq(InternalRow(1, 1), InternalRow(2, 2), InternalRow(3, 3)) + val projectDistinct = ProjectExec(Seq(x), + DummyLeafExecWithPartitioning(output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2dDistinct))) + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "false") { + projectDistinct.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.isGrouped, "distinct projected keys must be grouped") + assert(kp.isNarrowed, "projection must mark the KP as narrowed") + assert(kp.satisfies(ClusteredDistribution(Seq(x))), + "grouped narrowed KP must satisfy ClusteredDistribution without config (no merging)") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + } +} + +private case class DummyLeafExecWithPartitioning( + output: Seq[Attribute], + partitioning: Partitioning + ) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = null + override def outputPartitioning: Partitioning = partitioning } private case class DummyLeafPlanExec(output: Seq[Attribute]) extends LeafExecNode { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index 9c67a334c801c..f535e2b415ba9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -1128,7 +1128,7 @@ class EnsureRequirementsSuite extends SharedSparkSession { EnsureRequirements.apply(smjExec) match { case ShuffledHashJoinExec(_, _, _, _, _, DummySparkPlan(_, _, left: KeyedPartitioning, _, _), - ShuffleExchangeExec(KeyedPartitioning(attrs, pks, _), + ShuffleExchangeExec(KeyedPartitioning(attrs, pks, _, _), DummySparkPlan(_, _, SinglePartition, _, _), _, _), _) => assert(left.expressions == a1 :: Nil) assert(attrs == a1 :: Nil)