Skip to content

Commit 09aadf7

Browse files
szehon-hodongjoon-hyun
authored andcommitted
[SPARK-54172][SQL] Merge Into Schema Evolution should only add referenced columns
### What changes were proposed in this pull request? Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that exist in source and which are directly assigned to the source column without transformation. ie, ``` UPDATE SET new_col = source.new_col UPDATE SET struct.new_field = source.struct.new_field INSERT (old_col, new_col) VALUES (s.old_col, s.new_col) ``` ### Why are the changes needed? #51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement. ### Does this PR introduce _any_ user-facing change? No, MERGE INTO schema evolution is not yet released in Spark 4.1. ### How was this patch tested? Added many unit tests in MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes #52866 from szehon-ho/merge_schema_evolution_limit_cols. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 03eb023) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent d5d9ab0 commit 09aadf7

File tree

6 files changed

+1197
-189
lines changed

6 files changed

+1197
-189
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,7 +1670,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
16701670
case u: UpdateTable => resolveReferencesInUpdate(u)
16711671

16721672
case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _, _)
1673-
if !m.resolved && targetTable.resolved && sourceTable.resolved && !m.needSchemaEvolution =>
1673+
if !m.resolved && targetTable.resolved && sourceTable.resolved =>
1674+
1675+
// Do not throw exception for schema evolution case.
1676+
// This allows unresolved assignment keys a chance to be resolved by a second pass
1677+
// by newly column/nested fields added by schema evolution.
1678+
val throws = !m.schemaEvolutionEnabled
16741679

16751680
EliminateSubqueryAliases(targetTable) match {
16761681
case r: NamedRelation if r.skipSchemaResolution =>
@@ -1680,6 +1685,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
16801685
m
16811686

16821687
case _ =>
1688+
def findAttrInTarget(name: String): Option[Attribute] = {
1689+
targetTable.output.find(targetAttr => conf.resolver(name, targetAttr.name))
1690+
}
16831691
val newMatchedActions = m.matchedActions.map {
16841692
case DeleteAction(deleteCondition) =>
16851693
val resolvedDeleteCondition = deleteCondition.map(
@@ -1691,18 +1699,30 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
16911699
UpdateAction(
16921700
resolvedUpdateCondition,
16931701
// The update value can access columns from both target and source tables.
1694-
resolveAssignments(assignments, m, MergeResolvePolicy.BOTH))
1702+
resolveAssignments(assignments, m, MergeResolvePolicy.BOTH,
1703+
throws = throws))
16951704
case UpdateStarAction(updateCondition) =>
1696-
// Use only source columns. Missing columns in target will be handled in
1697-
// ResolveRowLevelCommandAssignments.
1698-
val assignments = targetTable.output.flatMap{ targetAttr =>
1699-
sourceTable.output.find(
1700-
sourceCol => conf.resolver(sourceCol.name, targetAttr.name))
1701-
.map(Assignment(targetAttr, _))}
1705+
// Expand star to top level source columns. If source has less columns than target,
1706+
// assignments will be added by ResolveRowLevelCommandAssignments later.
1707+
val assignments = if (m.schemaEvolutionEnabled) {
1708+
// For schema evolution case, generate assignments for missing target columns.
1709+
// These columns will be added by ResolveMergeIntoTableSchemaEvolution later.
1710+
sourceTable.output.map { sourceAttr =>
1711+
val key = findAttrInTarget(sourceAttr.name).getOrElse(
1712+
UnresolvedAttribute(sourceAttr.name))
1713+
Assignment(key, sourceAttr)
1714+
}
1715+
} else {
1716+
sourceTable.output.flatMap { sourceAttr =>
1717+
findAttrInTarget(sourceAttr.name).map(
1718+
targetAttr => Assignment(targetAttr, sourceAttr))
1719+
}
1720+
}
17021721
UpdateAction(
17031722
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
17041723
// For UPDATE *, the value must be from source table.
1705-
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
1724+
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
1725+
throws = throws))
17061726
case o => o
17071727
}
17081728
val newNotMatchedActions = m.notMatchedActions.map {
@@ -1713,21 +1733,33 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
17131733
resolveExpressionByPlanOutput(_, m.sourceTable))
17141734
InsertAction(
17151735
resolvedInsertCondition,
1716-
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
1736+
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
1737+
throws = throws))
17171738
case InsertStarAction(insertCondition) =>
17181739
// The insert action is used when not matched, so its condition and value can only
17191740
// access columns from the source table.
17201741
val resolvedInsertCondition = insertCondition.map(
17211742
resolveExpressionByPlanOutput(_, m.sourceTable))
1722-
// Use only source columns. Missing columns in target will be handled in
1723-
// ResolveRowLevelCommandAssignments.
1724-
val assignments = targetTable.output.flatMap{ targetAttr =>
1725-
sourceTable.output.find(
1726-
sourceCol => conf.resolver(sourceCol.name, targetAttr.name))
1727-
.map(Assignment(targetAttr, _))}
1743+
// Expand star to top level source columns. If source has less columns than target,
1744+
// assignments will be added by ResolveRowLevelCommandAssignments later.
1745+
val assignments = if (m.schemaEvolutionEnabled) {
1746+
// For schema evolution case, generate assignments for missing target columns.
1747+
// These columns will be added by ResolveMergeIntoTableSchemaEvolution later.
1748+
sourceTable.output.map { sourceAttr =>
1749+
val key = findAttrInTarget(sourceAttr.name).getOrElse(
1750+
UnresolvedAttribute(sourceAttr.name))
1751+
Assignment(key, sourceAttr)
1752+
}
1753+
} else {
1754+
sourceTable.output.flatMap { sourceAttr =>
1755+
findAttrInTarget(sourceAttr.name).map(
1756+
targetAttr => Assignment(targetAttr, sourceAttr))
1757+
}
1758+
}
17281759
InsertAction(
17291760
resolvedInsertCondition,
1730-
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
1761+
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
1762+
throws = throws))
17311763
case o => o
17321764
}
17331765
val newNotMatchedBySourceActions = m.notMatchedBySourceActions.map {
@@ -1741,7 +1773,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
17411773
UpdateAction(
17421774
resolvedUpdateCondition,
17431775
// The update value can access columns from the target table only.
1744-
resolveAssignments(assignments, m, MergeResolvePolicy.TARGET))
1776+
resolveAssignments(assignments, m, MergeResolvePolicy.TARGET,
1777+
throws = throws))
17451778
case o => o
17461779
}
17471780

@@ -1818,11 +1851,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
18181851
def resolveAssignments(
18191852
assignments: Seq[Assignment],
18201853
mergeInto: MergeIntoTable,
1821-
resolvePolicy: MergeResolvePolicy.Value): Seq[Assignment] = {
1854+
resolvePolicy: MergeResolvePolicy.Value,
1855+
throws: Boolean): Seq[Assignment] = {
18221856
assignments.map { assign =>
18231857
val resolvedKey = assign.key match {
18241858
case c if !c.resolved =>
1825-
resolveMergeExprOrFail(c, Project(Nil, mergeInto.targetTable))
1859+
resolveMergeExpr(c, Project(Nil, mergeInto.targetTable), throws)
18261860
case o => o
18271861
}
18281862
val resolvedValue = assign.value match {
@@ -1842,17 +1876,21 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
18421876
} else {
18431877
resolvedExpr
18441878
}
1845-
checkResolvedMergeExpr(withDefaultResolved, resolvePlan)
1879+
if (throws) {
1880+
checkResolvedMergeExpr(withDefaultResolved, resolvePlan)
1881+
}
18461882
withDefaultResolved
18471883
case o => o
18481884
}
18491885
Assignment(resolvedKey, resolvedValue)
18501886
}
18511887
}
18521888

1853-
private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan): Expression = {
1854-
val resolved = resolveExprInAssignment(e, p)
1855-
checkResolvedMergeExpr(resolved, p)
1889+
private def resolveMergeExpr(e: Expression, p: LogicalPlan, throws: Boolean): Expression = {
1890+
val resolved = resolveExprInAssignment(e, p, throws)
1891+
if (throws) {
1892+
checkResolvedMergeExpr(resolved, p)
1893+
}
18561894
resolved
18571895
}
18581896

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
425425
def resolveExpressionByPlanChildren(
426426
e: Expression,
427427
q: LogicalPlan,
428-
includeLastResort: Boolean = false): Expression = {
428+
includeLastResort: Boolean = false,
429+
throws: Boolean = true): Expression = {
429430
resolveExpression(
430431
tryResolveDataFrameColumns(e, q.children),
431432
resolveColumnByName = nameParts => {
@@ -435,7 +436,7 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
435436
assert(q.children.length == 1)
436437
q.children.head.output
437438
},
438-
throws = true,
439+
throws,
439440
includeLastResort = includeLastResort)
440441
}
441442

@@ -475,8 +476,14 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
475476
resolveVariables(resolveOuterRef(e))
476477
}
477478

478-
def resolveExprInAssignment(expr: Expression, hostPlan: LogicalPlan): Expression = {
479-
resolveExpressionByPlanChildren(expr, hostPlan) match {
479+
def resolveExprInAssignment(
480+
expr: Expression,
481+
hostPlan: LogicalPlan,
482+
throws: Boolean = true): Expression = {
483+
resolveExpressionByPlanChildren(expr,
484+
hostPlan,
485+
includeLastResort = false,
486+
throws = throws) match {
480487
// Assignment key and value does not need the alias when resolving nested columns.
481488
case Alias(child: ExtractValue, _) => child
482489
case other => other

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
2020
import org.apache.spark.sql.catalyst.plans.logical._
2121
import org.apache.spark.sql.catalyst.rules.Rule
2222
import org.apache.spark.sql.catalyst.types.DataTypeUtils
23-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
23+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsRowLevelOperations, TableCatalog, TableChange}
2424
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
2525
import org.apache.spark.sql.errors.QueryCompilationErrors
2626
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
27+
import org.apache.spark.sql.types.StructType
2728

2829

2930
/**
@@ -34,24 +35,38 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3435
object ResolveMergeIntoSchemaEvolution extends Rule[LogicalPlan] {
3536

3637
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
37-
case m @ MergeIntoTable(_, _, _, _, _, _, _)
38-
if m.needSchemaEvolution =>
39-
val newTarget = m.targetTable.transform {
40-
case r : DataSourceV2Relation => performSchemaEvolution(r, m.sourceTable)
38+
// This rule should run only if all assignments are resolved, except those
39+
// that will be satisfied by schema evolution
40+
case m@MergeIntoTable(_, _, _, _, _, _, _) if m.evaluateSchemaEvolution =>
41+
val changes = m.changesForSchemaEvolution
42+
if (changes.isEmpty) {
43+
m
44+
} else {
45+
m transformUpWithNewOutput {
46+
case r @ DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _, _) =>
47+
val referencedSourceSchema = MergeIntoTable.sourceSchemaForSchemaEvolution(m)
48+
val newTarget = performSchemaEvolution(r, referencedSourceSchema, changes)
49+
val oldTargetOutput = m.targetTable.output
50+
val newTargetOutput = newTarget.output
51+
val attributeMapping = oldTargetOutput.map(
52+
oldAttr => (oldAttr, newTargetOutput.find(_.name == oldAttr.name).getOrElse(oldAttr))
53+
)
54+
newTarget -> attributeMapping
4155
}
42-
m.copy(targetTable = newTarget)
56+
}
4357
}
4458

45-
private def performSchemaEvolution(relation: DataSourceV2Relation, source: LogicalPlan)
46-
: DataSourceV2Relation = {
59+
private def performSchemaEvolution(
60+
relation: DataSourceV2Relation,
61+
referencedSourceSchema: StructType,
62+
changes: Array[TableChange]): DataSourceV2Relation = {
4763
(relation.catalog, relation.identifier) match {
4864
case (Some(c: TableCatalog), Some(i)) =>
49-
val changes = MergeIntoTable.schemaChanges(relation.schema, source.schema)
5065
c.alterTable(i, changes: _*)
5166
val newTable = c.loadTable(i)
5267
val newSchema = CatalogV2Util.v2ColumnsToStructType(newTable.columns())
5368
// Check if there are any remaining changes not applied.
54-
val remainingChanges = MergeIntoTable.schemaChanges(newSchema, source.schema)
69+
val remainingChanges = MergeIntoTable.schemaChanges(newSchema, referencedSourceSchema)
5570
if (remainingChanges.nonEmpty) {
5671
throw QueryCompilationErrors.unsupportedTableChangesInAutoSchemaEvolutionError(
5772
remainingChanges, i.toQualifiedNameParts(c))

0 commit comments

Comments
 (0)