Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,16 +353,18 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa
* `KeyedPartitioning` is used in two distinct forms:
*
* 1. '''As outputPartitioning''': When used as a node's output partitioning (e.g., in
* `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are always in sorted order.
* This is how leaf data source nodes produce partition keys originally, and this ordering is
* preserved through `GroupPartitionsExec`. The sorted order is critical for storage-partitioned
* join compatibility.
* `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are typically in sorted order
* because data sources produce them that way and `GroupPartitionsExec` sorts while grouping.
* Sorted order is not a hard requirement, but it is a useful property: when both sides of a
* storage-partitioned join report sorted keys, `EnsureRequirements` can often match them
* without inserting an additional `GroupPartitionsExec`. After a narrowing projection through
* `PartitioningPreservingUnaryExecNode`, the projected keys may no longer be sorted; this is
* acceptable because `EnsureRequirements` can always reconcile both sides via
* `GroupPartitionsExec` with `expectedPartitionKeys`.
*
* 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not be
* in sorted order. This occurs because `KeyedShuffleSpec` can project the partition keys by join
* key positions. The `EnsureRequirements` rule ensures that either the unordered keys from both
* sides of a join match exactly, or it builds a common ordered set of keys and pushes them down
* to `GroupPartitionsExec` on both sides to establish a compatible ordering.
* 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not
* be in sorted order. `EnsureRequirements` handles this by building a common ordered set of
* keys and pushing them down to `GroupPartitionsExec` on both sides.
*
* == Partition Keys ==
* - `partitionKeys`: The partition keys, one per partition. May contain duplicates initially
Expand Down Expand Up @@ -417,16 +419,23 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa
*
* @param expressions Partition transform expressions (e.g., `years(col)`, `bucket(10, col)`).
* @param partitionKeys Partition keys wrapped in InternalRowComparableWrapper for efficient
* comparison and grouping. One per partition. When used as outputPartitioning,
* always in sorted order. When used in `KeyedShuffleSpec`, may be unsorted
* after projection. May contain duplicates when ungrouped.
* comparison and grouping. One per partition. Typically in sorted order when
* produced by a data source or `GroupPartitionsExec`, but this is not
* guaranteed after projection. May contain duplicates when ungrouped.
* @param isGrouped Whether partition keys are unique (no duplicates). Computed on first
* creation, then preserved through copy operations to avoid recomputation.
* @param isNarrowed Whether this partitioning was derived from a finer-grained one by dropping
* key positions (e.g. via `PartitioningPreservingUnaryExecNode`). When true,
* `GroupPartitionsExec` will merge partitions that shared distinct keys in the
* original partitioning, carrying the same skew risk as
* `allowJoinKeysSubsetOfPartitionKeys`. Such a partitioning will not satisfy
* `ClusteredDistribution` unless that config is enabled.
*/
case class KeyedPartitioning(
expressions: Seq[Expression],
@transient partitionKeys: Seq[InternalRowComparableWrapper],
isGrouped: Boolean) extends Expression with Partitioning with Unevaluable {
isGrouped: Boolean,
isNarrowed: Boolean = false) extends Expression with Partitioning with Unevaluable {
override val numPartitions = partitionKeys.length

override def children: Seq[Expression] = expressions
Expand Down Expand Up @@ -480,13 +489,20 @@ case class KeyedPartitioning(
c.areAllClusterKeysMatched(expressions)
} else {
// We'll need to find leaf attributes from the partition expressions first.
val attributes = expressions.flatMap(_.collectLeaves())
lazy val attributes = expressions.flatMap(_.collectLeaves())

if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) {
// check that join keys (required clustering keys)
// overlap with partition keys (KeyedPartitioning attributes)
requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
expressions.forall(_.collectLeaves().size == 1)
} else if (isNarrowed && !isGrouped) {
// A narrowed, non-grouped partitioning carries the same skew risk as using a subset
// of partition keys for a join: GroupPartitionsExec will merge partitions that held
// distinct keys in the original finer-grained partitioning. Require the same config
// to opt in. (When isGrouped=true the projected keys are already distinct, so no
// merging happens and there is no skew risk.)
false
} else {
attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.apache.spark.sql.execution

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet}
import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning}
import org.apache.spark.sql.catalyst.trees.MultiTransform

/**
* A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that
Expand All @@ -29,35 +30,116 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
with AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
val partitionings: Seq[Partitioning] = if (hasAlias) {
flattenPartitioning(child.outputPartitioning).iterator.flatMap {
val (keyedPartitionings, otherPartitionings) =
flattenPartitioning(child.outputPartitioning).partition(_.isInstanceOf[KeyedPartitioning])

val projectedKPs =
projectKeyedPartitionings(keyedPartitionings.map(_.asInstanceOf[KeyedPartitioning]))
val projectedOthers = projectOtherPartitionings(otherPartitionings)

(projectedKPs ++ projectedOthers).take(aliasCandidateLimit) match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(p) => p
case ps => PartitioningCollection(ps)
}
}

/**
* Projects non-[[KeyedPartitioning]] partitionings through the current node's output expressions.
*
* With aliases, each partitioning expression is substituted with all possible alias combinations;
* without aliases, partitionings whose expressions reference attributes outside the output are
* dropped.
*/
private def projectOtherPartitionings(
partitionings: Seq[Partitioning]): LazyList[Partitioning] = {
if (hasAlias) {
partitionings.to(LazyList).flatMap {
case e: Expression =>
// We need unique partitionings but if the input partitioning is
// `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after
// the projection we have 4 partitionings:
// `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then
// after the projection we have 4 partitionings:
// `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`,
// `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but
// `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`.
val partitioningSet = mutable.Set.empty[Expression]
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
.asInstanceOf[LazyList[Partitioning]]
case o => Seq(o)
}.take(aliasCandidateLimit).toSeq
case o => LazyList(o)
}
} else {
// Filter valid partitiongs (only reference output attributes of the current plan node)
// Filter valid partitionings (only reference output attributes of the current plan node)
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
flattenPartitioning(child.outputPartitioning).filter {
partitionings.to(LazyList).filter {
case e: Expression => e.references.subsetOf(outputSet)
case _ => true
}
}
partitionings match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(p) => p
case ps => PartitioningCollection(ps)
}
}

/**
* Projects all input [[KeyedPartitioning]]s through the current node's output expressions.
*
* For each expression position (0..N-1), collects the unique expressions at that position across
* all input KPs, projects each through the output aliases, and unions the alternatives.
* Positions with at least one alternative are projectable; their count determines the maximum
* achievable granularity. Positions that cannot be expressed in the output are dropped.
*
* The resulting [[KeyedPartitioning]]s are the cross-product of the per-position alternatives
* restricted to the projectable positions. All share the same `partitionKeys` object (projected
* to the same subset of positions), preserving the invariant required by [[GroupPartitionsExec]].
*/
private def projectKeyedPartitionings(
kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = {
if (kps.isEmpty) return LazyList.empty
val numPositions = kps.head.expressions.length

val alternativesPerPosition: IndexedSeq[LazyList[Expression]] =
if (hasAlias) {
// For each position, gather unique expressions across all KPs (ExpressionSet deduplicates
// semantically equal expressions, e.g. the same join-key column shared by both KP sides)
// and project each through the output aliases.
(0 until numPositions).map { i =>
val seen = mutable.Set.empty[Expression]
ExpressionSet(kps.map(_.expressions(i))).to(LazyList).flatMap { expr =>
projectExpression(expr).filter(e => seen.add(e.canonicalized))
}
}
} else {
// No aliases: filter out non-projectable expressions first, then deduplicate.
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
(0 until numPositions).map { i =>
ExpressionSet(kps.collect {
case kp if kp.expressions(i).references.subsetOf(outputSet) => kp.expressions(i)
}).to(LazyList)
}
}

// Non-empty positions define the maximum achievable granularity.
val projectablePositions =
(0 until numPositions).filter(i => alternativesPerPosition(i).nonEmpty)

if (projectablePositions.isEmpty) return LazyList.empty

// All input KPs share the same partitionKeys by invariant; use the first as the key source.
val keySource = kps.head
val sharedKeys =
if (projectablePositions.length == numPositions) keySource.partitionKeys
else keySource.projectKeys(projectablePositions)._2

val isGrouped = sharedKeys.distinct.size == sharedKeys.size
val isNarrowed = projectablePositions.length < numPositions

// Cross-product the per-position alternatives to produce all concrete KPs.
// Note: generateCartesianProduct expects thunks () => Seq[T], but wrapping LazyLists in thunks
// here is not strictly necessary since they are already lazy -- we do it only to match the API.
// No deduplication is needed here: per-position alternatives are already canonically distinct,
// so all cross-product combinations are distinct by construction.
MultiTransform.generateCartesianProduct(
projectablePositions.map(i => () => alternativesPerPosition(i)))
.map(projectedExprs =>
new KeyedPartitioning(projectedExprs, sharedKeys, isGrouped, isNarrowed))
}

private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,12 @@ case class EnsureRequirements(
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys)
.orElse(reorderJoinKeysRecursively(
leftKeys, rightKeys, leftPartitioning, None))
case (Some(KeyedPartitioning(clustering, _, _)), _) =>
case (Some(KeyedPartitioning(clustering, _, _, _)), _) =>
val leafExprs = clustering.flatMap(_.collectLeaves())
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leafExprs, leftKeys)
.orElse(reorderJoinKeysRecursively(
leftKeys, rightKeys, None, rightPartitioning))
case (_, Some(KeyedPartitioning(clustering, _, _))) =>
case (_, Some(KeyedPartitioning(clustering, _, _, _))) =>
val leafExprs = clustering.flatMap(_.collectLeaves())
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leafExprs, rightKeys)
.orElse(reorderJoinKeysRecursively(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ object ShuffleExchangeExec {
val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
row => projection(row)
case SinglePartition => identity
case KeyedPartitioning(expressions, _, _) =>
case KeyedPartitioning(expressions, _, _, _) =>
row => bindReferences(expressions, outputAttributes).map(_.eval(row))
case s: ShufflePartitionIdPassThrough =>
// For ShufflePartitionIdPassThrough, the expression directly evaluates to the partition ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class DistributionAndOrderingSuiteBase
plan: QueryPlan[T]): Partitioning = partitioning match {
case HashPartitioning(exprs, numPartitions) =>
HashPartitioning(exprs.map(resolveAttrs(_, plan)), numPartitions)
case KeyedPartitioning(expressions, partitionKeys, isGrouped) =>
case KeyedPartitioning(expressions, partitionKeys, isGrouped, _) =>
KeyedPartitioning(expressions.map(resolveAttrs(_, plan)), partitionKeys, isGrouped)
case PartitioningCollection(partitionings) =>
PartitioningCollection(partitionings.map(resolvePartitioning(_, plan)))
Expand Down
Loading