Skip to content

Commit 75fdec3

Browse files
asl3dongjoon-hyun
authored andcommitted
[SPARK-54310][SQL] Add numSourceRows metric for MergeIntoExec
### What changes were proposed in this pull request? Add `numSourceRows` metric for `MergeIntoExec`, from source node's `numOutputRows`. Assumption is that all child nodes have `numOutputRows`. If not found, `numSourceRows` would be -1. ### Why are the changes needed? Improve completeness and debuggability of Merge Into metrics. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test cases for numSourceNodes metric. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52669 from asl3/merge-numsourcerows. Authored-by: Amanda Liu <amanda.liu@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent fb8dbd7 commit 75fdec3

File tree

4 files changed

+346
-227
lines changed

4 files changed

+346
-227
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727
@Evolving
2828
public interface MergeSummary extends WriteSummary {
2929

30+
/**
31+
* Returns the number of source rows.
32+
*/
33+
long numSourceRows();
34+
3035
/**
3136
* Returns the number of target rows copied unmodified because they did not match any action,
3237
* or -1 if not found.

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.apache.spark.sql.connector.write
2121
* Implementation of [[MergeSummary]] that provides MERGE operation summary.
2222
*/
2323
private[sql] case class MergeSummaryImpl(
24+
numSourceRows: Long,
2425
numTargetRowsCopied: Long,
2526
numTargetRowsDeleted: Long,
2627
numTargetRowsUpdated: Long,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER
3131
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege}
3232
import org.apache.spark.sql.connector.expressions.Transform
3333
import org.apache.spark.sql.connector.metric.CustomMetric
34-
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary}
34+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage, WriteSummary}
3535
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3636
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
3737
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
38+
import org.apache.spark.sql.execution.joins.BaseJoinExec
3839
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics}
3940
import org.apache.spark.sql.types.StructType
4041
import org.apache.spark.util.{LongAccumulator, Utils}
@@ -481,7 +482,9 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
481482
private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
482483
collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
483484
val metrics = n.metrics
485+
val numSourceRows = getNumSourceRows(n)
484486
MergeSummaryImpl(
487+
numSourceRows,
485488
metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
486489
metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L),
487490
metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L),
@@ -493,6 +496,40 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
493496
)
494497
}
495498
}
499+
500+
private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = {
501+
def hasTargetTable(plan: SparkPlan): Boolean = {
502+
collectFirst(plan) {
503+
case scan @ BatchScanExec(_, _, _, _, _: RowLevelOperationTable, _) => scan
504+
}.isDefined
505+
}
506+
507+
def findSourceScan(join: BaseJoinExec): Option[SparkPlan] = {
508+
val leftHasTarget = hasTargetTable(join.left)
509+
val rightHasTarget = hasTargetTable(join.right)
510+
511+
val sourceSide = if (leftHasTarget) {
512+
Some(join.right)
513+
} else if (rightHasTarget) {
514+
Some(join.left)
515+
} else {
516+
None
517+
}
518+
519+
sourceSide.flatMap { side =>
520+
collectFirst(side) {
521+
case source if source.metrics.contains("numOutputRows") =>
522+
source
523+
}
524+
}
525+
}
526+
527+
(for {
528+
join <- collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j }
529+
sourceScan <- findSourceScan(join)
530+
metric <- sourceScan.metrics.get("numOutputRows")
531+
} yield metric.value).getOrElse(-1L)
532+
}
496533
}
497534

498535
trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable {

0 commit comments

Comments
 (0)