-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-55848][SQL][4.1] Fix incorrect dedup results with SPJ partial clustering #54751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.functions._ | |
| import org.apache.spark.sql.connector.distributions.Distributions | ||
| import org.apache.spark.sql.connector.expressions._ | ||
| import org.apache.spark.sql.connector.expressions.Expressions._ | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan} | ||
| import org.apache.spark.sql.execution.datasources.v2.BatchScanExec | ||
| import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation | ||
| import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec | ||
|
|
@@ -94,13 +94,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { | |
|
|
||
| checkQueryPlan(df, catalystDistribution, | ||
| physical.KeyGroupedPartitioning(catalystDistribution.clustering, projectedPositions, | ||
| partitionValues, partitionValues)) | ||
| partitionValues, partitionValues, isPartiallyClustered = false)) | ||
|
|
||
| // multiple group keys should work too as long as partition keys are subset of them | ||
| df = sql(s"SELECT count(*) FROM testcat.ns.$table GROUP BY id, ts") | ||
| checkQueryPlan(df, catalystDistribution, | ||
| physical.KeyGroupedPartitioning(catalystDistribution.clustering, projectedPositions, | ||
| partitionValues, partitionValues)) | ||
| partitionValues, partitionValues, isPartiallyClustered = false)) | ||
| } | ||
|
|
||
| test("non-clustered distribution: no partition") { | ||
|
|
@@ -2892,4 +2892,150 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { | |
| Row("ccc", 30, 400.50))) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-55848: dropDuplicates after SPJ with partial clustering should produce " + | ||
| "correct results") { | ||
| val items_partitions = Array(identity("id")) | ||
| createTable(items, itemsColumns, items_partitions) | ||
| sql(s"INSERT INTO testcat.ns.$items VALUES " + | ||
| "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + | ||
| "(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + | ||
| "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + | ||
| "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") | ||
|
|
||
| val purchases_partitions = Array(identity("item_id")) | ||
| createTable(purchases, purchasesColumns, purchases_partitions) | ||
| sql(s"INSERT INTO testcat.ns.$purchases VALUES " + | ||
| "(1, 42.0, cast('2020-01-01' as timestamp)), " + | ||
| "(1, 50.0, cast('2020-01-02' as timestamp)), " + | ||
| "(2, 11.0, cast('2020-01-01' as timestamp)), " + | ||
| "(3, 19.5, cast('2020-02-01' as timestamp))") | ||
|
|
||
| withSQLConf( | ||
| SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> true.toString) { | ||
| // dropDuplicates on the join key after a partially-clustered SPJ must still | ||
| // produce the correct number of distinct ids. Before the fix, the | ||
| // partially-clustered partitioning was incorrectly treated as satisfying | ||
| // ClusteredDistribution, so EnsureRequirements did not insert an Exchange | ||
| // before the dedup, leading to duplicate rows. | ||
| val df = sql( | ||
| s""" | ||
| |SELECT DISTINCT i.id | ||
| |FROM testcat.ns.$items i | ||
| |JOIN testcat.ns.$purchases p ON i.id = p.item_id | ||
| |""".stripMargin) | ||
| checkAnswer(df, Seq(Row(1), Row(2), Row(3))) | ||
|
|
||
| val allShuffles = collectAllShuffles(df.queryExecution.executedPlan) | ||
| assert(allShuffles.nonEmpty, | ||
| "should contain a shuffle for the post-join dedup with partial clustering") | ||
|
|
||
| val scans = collectScans(df.queryExecution.executedPlan) | ||
| assert(scans.exists(_.outputPartitioning match { | ||
| case kgp: physical.KeyGroupedPartitioning => kgp.isPartiallyClustered | ||
| case _ => false | ||
| }), "at least one BatchScanExec should have partially-clustered KeyGroupedPartitioning") | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-55848: Window dedup after SPJ with partial clustering should produce " + | ||
| "correct results") { | ||
| val items_partitions = Array(identity("id")) | ||
| createTable(items, itemsColumns, items_partitions) | ||
| sql(s"INSERT INTO testcat.ns.$items VALUES " + | ||
| "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + | ||
| "(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + | ||
| "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + | ||
| "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") | ||
|
|
||
| val purchases_partitions = Array(identity("item_id")) | ||
| createTable(purchases, purchasesColumns, purchases_partitions) | ||
| sql(s"INSERT INTO testcat.ns.$purchases VALUES " + | ||
| "(1, 42.0, cast('2020-01-01' as timestamp)), " + | ||
| "(1, 50.0, cast('2020-01-02' as timestamp)), " + | ||
| "(2, 11.0, cast('2020-01-01' as timestamp)), " + | ||
| "(3, 19.5, cast('2020-02-01' as timestamp))") | ||
|
|
||
| withSQLConf( | ||
| SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> true.toString) { | ||
| // Use ROW_NUMBER() OVER to dedup joined rows per id after a partially-clustered | ||
| // SPJ. The WINDOW operator requires ClusteredDistribution on i.id; with partial | ||
| // clustering the plan must insert a shuffle so that the window | ||
| // produces exactly one row per id. | ||
| val df = sql( | ||
| s""" | ||
| |SELECT id, price FROM ( | ||
| | SELECT i.id, i.price, | ||
| | ROW_NUMBER() OVER (PARTITION BY i.id ORDER BY i.price DESC) AS rn | ||
| | FROM testcat.ns.$items i | ||
| | JOIN testcat.ns.$purchases p ON i.id = p.item_id | ||
| |) t WHERE rn = 1 | ||
| |""".stripMargin) | ||
| checkAnswer(df, Seq(Row(1, 41.0f), Row(2, 10.0f), Row(3, 15.5f))) | ||
|
|
||
| val allShuffles = collectAllShuffles(df.queryExecution.executedPlan) | ||
| assert(allShuffles.nonEmpty, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Besides checking the presence of shuffle, can you please also check if we have partially clustered |
||
| "should contain a shuffle for the post-join window with partial clustering") | ||
|
|
||
| val scans = collectScans(df.queryExecution.executedPlan) | ||
| assert(scans.exists(_.outputPartitioning match { | ||
| case kgp: physical.KeyGroupedPartitioning => kgp.isPartiallyClustered | ||
| case _ => false | ||
| }), "at least one BatchScanExec should have partially-clustered KeyGroupedPartitioning") | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-55848: checkpointed partially-clustered join with dedup") { | ||
| withTempDir { dir => | ||
| spark.sparkContext.setCheckpointDir(dir.getPath) | ||
| val items_partitions = Array(identity("id")) | ||
| createTable(items, itemsColumns, items_partitions) | ||
| sql(s"INSERT INTO testcat.ns.$items VALUES " + | ||
| "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + | ||
| "(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + | ||
| "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + | ||
| "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") | ||
|
|
||
| val purchases_partitions = Array(identity("item_id")) | ||
| createTable(purchases, purchasesColumns, purchases_partitions) | ||
| sql(s"INSERT INTO testcat.ns.$purchases VALUES " + | ||
| "(1, 42.0, cast('2020-01-01' as timestamp)), " + | ||
| "(1, 50.0, cast('2020-01-02' as timestamp)), " + | ||
| "(2, 11.0, cast('2020-01-01' as timestamp)), " + | ||
| "(3, 19.5, cast('2020-02-01' as timestamp))") | ||
|
|
||
| withSQLConf( | ||
| SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", | ||
| SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", | ||
| SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> true.toString) { | ||
| // Checkpoint the JOIN result (not the scan) so the plan behind the | ||
| // checkpoint carries partially-clustered KeyGroupedPartitioning. | ||
| // The dedup on top must still insert an Exchange because the | ||
| // isPartiallyClustered flag causes satisfies0()=false for | ||
| // ClusteredDistribution. | ||
| val joinedDf = spark.sql( | ||
| s"""SELECT i.id, i.name, i.price | ||
| |FROM testcat.ns.$items i | ||
| |JOIN testcat.ns.$purchases p ON i.id = p.item_id""".stripMargin) | ||
| val checkpointedDf = joinedDf.checkpoint() | ||
| val df = checkpointedDf.select("id").distinct() | ||
|
|
||
| checkAnswer(df, Seq(Row(1), Row(2), Row(3))) | ||
|
|
||
| val allShuffles = collectAllShuffles(df.queryExecution.executedPlan) | ||
| assert(allShuffles.nonEmpty, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Besides checking the presence of shuffle, can you please also check if we have partially clustered |
||
| "should contain a shuffle for the dedup after checkpointed " + | ||
| "partially-clustered join") | ||
|
|
||
| val rddScans = collect(df.queryExecution.executedPlan) { | ||
| case r: RDDScanExec => r | ||
| } | ||
| assert(rddScans.exists(_.outputPartitioning match { | ||
| case kgp: physical.KeyGroupedPartitioning => kgp.isPartiallyClustered | ||
| case _ => false | ||
| }), "checkpoint (RDDScanExec) should have " + | ||
| "partially-clustered KeyGroupedPartitioning") | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides checking the presence of shuffle, can you please also check if we have partially clustered
KeyGroupedPartitioningfrom the scans?