Skip to content

Commit 59759f3

Browse files
aokolnychyicloud-fan
authored andcommitted
[SPARK-54157][SQL] Fix refresh of DSv2 tables in Dataset
### What changes were proposed in this pull request? This PR fixes refresh of DSv2 tables in Dataset. ### Why are the changes needed? Prior to this change, Spark would pin the version of DSv2 tables at load/resolution time. Any changes within the session will not be propagated to the analyzed but not yet executed Dataset, breaking the behavior compared to DSv1 tables. Changes in this PR are needed for the following reasons: - Prevent scanning/joining inconsistent versions of the table in the same session (see tests). - Prevent stale results upon external and session changes. - Remove workarounds in DSv2 connectors by fixing the problem in Spark. ### Does this PR introduce _any_ user-facing change? Yes, but this PR makes DSv2 Table behavior match the expected Spark semantics. ### How was this patch tested? This PR comes with tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52920 from aokolnychyi/spark-54157. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent be98a29 commit 59759f3

File tree

17 files changed

+1234
-17
lines changed

17 files changed

+1234
-17
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2210,6 +2210,31 @@
22102210
],
22112211
"sqlState" : "42000"
22122212
},
2213+
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS" : {
2214+
"message" : [
2215+
"Detected incompatible changes to table <tableName> after DataFrame/Dataset has been resolved and analyzed, meaning the underlying plan is out of sync. Please, re-create DataFrame/Dataset before attempting to execute the query again."
2216+
],
2217+
"subClass" : {
2218+
"COLUMNS_MISMATCH" : {
2219+
"message" : [
2220+
"Data columns have changed:",
2221+
"<errors>"
2222+
]
2223+
},
2224+
"METADATA_COLUMNS_MISMATCH" : {
2225+
"message" : [
2226+
"Metadata columns have changed:",
2227+
"<errors>"
2228+
]
2229+
},
2230+
"TABLE_ID_MISMATCH" : {
2231+
"message" : [
2232+
"Table ID has changed from <capturedTableId> to <currentTableId>."
2233+
]
2234+
}
2235+
},
2236+
"sqlState" : "51024"
2237+
},
22132238
"INCOMPATIBLE_VIEW_SCHEMA_CHANGE" : {
22142239
"message" : [
22152240
"The SQL query of view <viewName> has an incompatible schema change and column <colName> cannot be resolved. Expected <expectedNum> columns named <colName> but got <actualCols>.",

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ public interface Table {
5050
*/
5151
String name();
5252

53+
/**
54+
* An ID of the table that can be used to reliably check if two table objects refer to the same
55+
* metastore entity. If a table is dropped and recreated again with the same name, the new table
56+
* ID must be different. This method must return null if connectors don't support the notion of
57+
* table ID.
58+
*/
59+
default String id() {
60+
return null;
61+
}
62+
5363
/**
5464
* Returns the schema of this table. If the table is not readable and doesn't have a schema, an
5565
* empty schema can be returned here.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, M
4040
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
4141
import org.apache.spark.sql.errors.QueryExecutionErrors
4242
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table}
43+
import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil
4344
import org.apache.spark.sql.internal.SQLConf
4445
import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
4546
import org.apache.spark.util.ArrayImplicits._
@@ -687,7 +688,14 @@ case class ReplaceTableAsSelect(
687688
isAnalyzed: Boolean = false)
688689
extends V2CreateTableAsSelectPlan {
689690

690-
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
691+
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = {
692+
// RTAS may drop and recreate table before query execution, breaking self-references
693+
// refresh and pin versions here to read from original table versions instead of
694+
// newly created empty table that is meant to serve as target for append/overwrite
695+
val refreshedQuery = V2TableRefreshUtil.refreshVersions(query)
696+
val pinnedQuery = V2TableRefreshUtil.pinVersions(refreshedQuery)
697+
copy(query = pinnedQuery, isAnalyzed = true)
698+
}
691699

692700
override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
693701
this.copy(partitioning = rewritten)
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog
19+
20+
import java.util.Locale
21+
22+
import scala.collection.mutable
23+
24+
import org.apache.spark.sql.catalyst.SQLConfHelper
25+
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, MetadataColumnHelper}
26+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
27+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
28+
import org.apache.spark.sql.types.DataType
29+
import org.apache.spark.sql.util.SchemaUtils
30+
import org.apache.spark.util.ArrayImplicits._
31+
32+
private[sql] object V2TableUtil extends SQLConfHelper {
33+
34+
def toQualifiedName(catalog: CatalogPlugin, ident: Identifier): String = {
35+
s"${quoteIfNeeded(catalog.name)}.${ident.quoted}"
36+
}
37+
38+
/**
39+
* Validates that captured data columns match the current table schema.
40+
*
41+
* @param table the current table metadata
42+
* @param relation the relation with captured columns
43+
* @return validation errors, or empty sequence if valid
44+
*/
45+
def validateCapturedColumns(table: Table, relation: DataSourceV2Relation): Seq[String] = {
46+
validateCapturedColumns(table, relation.table.columns.toImmutableArraySeq)
47+
}
48+
49+
/**
50+
* Validates that captured data columns match the current table schema.
51+
*
52+
* Checks for:
53+
* - Column type or nullability changes
54+
* - Removed columns (missing from the current table schema)
55+
* - Added columns (new in the current table schema)
56+
*
57+
* @param table the current table metadata
58+
* @param originCols the originally captured columns
59+
* @return validation errors, or empty sequence if valid
60+
*/
61+
def validateCapturedColumns(table: Table, originCols: Seq[Column]): Seq[String] = {
62+
val errors = mutable.ArrayBuffer[String]()
63+
val colsByNormalizedName = indexColumns(table.columns.toImmutableArraySeq)
64+
val originColsByNormalizedName = indexColumns(originCols)
65+
66+
originColsByNormalizedName.foreach { case (normalizedName, originCol) =>
67+
colsByNormalizedName.get(normalizedName) match {
68+
case Some(col) =>
69+
if (originCol.dataType != col.dataType || originCol.nullable != col.nullable) {
70+
val oldType = formatType(originCol.dataType, originCol.nullable)
71+
val newType = formatType(col.dataType, col.nullable)
72+
errors += s"`${originCol.name}` type has changed from $oldType to $newType"
73+
}
74+
case None =>
75+
errors += s"${formatColumn(originCol)} has been removed"
76+
}
77+
}
78+
79+
colsByNormalizedName.foreach { case (normalizedName, col) =>
80+
if (!originColsByNormalizedName.contains(normalizedName)) {
81+
errors += s"${formatColumn(col)} has been added"
82+
}
83+
}
84+
85+
errors.toSeq
86+
}
87+
88+
/**
89+
* Validates that captured metadata columns are consistent with the current table metadata.
90+
*
91+
* @param table the current table metadata
92+
* @param relation the relation with captured metadata columns
93+
* @return validation errors, or empty sequence if valid
94+
*/
95+
def validateCapturedMetadataColumns(table: Table, relation: DataSourceV2Relation): Seq[String] = {
96+
validateCapturedMetadataColumns(table, extractMetadataColumns(relation))
97+
}
98+
99+
// extracts original column info for all metadata attributes in relation
100+
private def extractMetadataColumns(relation: DataSourceV2Relation): Seq[MetadataColumn] = {
101+
val metaAttrs = relation.output.filter(_.isMetadataCol)
102+
if (metaAttrs.nonEmpty) {
103+
val metaCols = metadataColumns(relation.table)
104+
val normalizedMetaAttrNames = metaAttrs.map(attr => normalize(attr.name)).toSet
105+
metaCols.filter(col => normalizedMetaAttrNames.contains(normalize(col.name)))
106+
} else {
107+
Seq.empty
108+
}
109+
}
110+
111+
/**
112+
* Validates that captured metadata columns are consistent with the current table metadata.
113+
*
114+
* Checks for:
115+
* - Metadata column type or nullability changes
116+
* - Removed metadata columns (missing from current table)
117+
*
118+
* @param table the current table metadata
119+
* @param originMetaCols the originally captured metadata columns
120+
* @return validation errors, or empty sequence if valid
121+
*/
122+
def validateCapturedMetadataColumns(
123+
table: Table,
124+
originMetaCols: Seq[MetadataColumn]): Seq[String] = {
125+
val errors = mutable.ArrayBuffer[String]()
126+
val metaCols = metadataColumns(table)
127+
val metaColsByNormalizedName = indexMetadataColumns(metaCols)
128+
129+
originMetaCols.foreach { originMetaCol =>
130+
val normalizedName = normalize(originMetaCol.name)
131+
metaColsByNormalizedName.get(normalizedName) match {
132+
case Some(metaCol) =>
133+
if (originMetaCol.dataType != metaCol.dataType ||
134+
originMetaCol.isNullable != metaCol.isNullable) {
135+
val oldType = formatType(originMetaCol.dataType, originMetaCol.isNullable)
136+
val newType = formatType(metaCol.dataType, metaCol.isNullable)
137+
errors += s"`${originMetaCol.name}` type has changed from $oldType to $newType"
138+
}
139+
case None =>
140+
errors += s"${formatMetadataColumn(originMetaCol)} has been removed"
141+
}
142+
}
143+
144+
errors.toSeq
145+
}
146+
147+
private def formatColumn(col: Column): String = {
148+
s"`${col.name}` ${formatType(col.dataType, col.nullable)}"
149+
}
150+
151+
private def formatMetadataColumn(col: MetadataColumn): String = {
152+
s"`${col.name}` ${formatType(col.dataType, col.isNullable)}"
153+
}
154+
155+
private def formatType(dataType: DataType, nullable: Boolean): String = {
156+
if (nullable) dataType.sql else s"${dataType.sql} NOT NULL"
157+
}
158+
159+
private def indexColumns(cols: Seq[Column]): Map[String, Column] = {
160+
index(cols)(_.name)
161+
}
162+
163+
private def indexMetadataColumns(cols: Seq[MetadataColumn]): Map[String, MetadataColumn] = {
164+
index(cols)(_.name)
165+
}
166+
167+
private def index[C](cols: Seq[C])(extractName: C => String): Map[String, C] = {
168+
SchemaUtils.checkColumnNameDuplication(cols.map(extractName), conf.caseSensitiveAnalysis)
169+
cols.map(col => normalize(extractName(col)) -> col).toMap
170+
}
171+
172+
private def metadataColumns(table: Table): Seq[MetadataColumn] = table match {
173+
case hasMeta: SupportsMetadataColumns => hasMeta.metadataColumns.toImmutableArraySeq
174+
case _ => Seq.empty
175+
}
176+
177+
private def normalize(name: String): String = {
178+
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
179+
}
180+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2121,6 +2121,38 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
21212121
}
21222122
}
21232123

2124+
def tableIdChangedAfterAnalysis(
2125+
tableName: String,
2126+
capturedTableId: String,
2127+
currentTableId: String): Throwable = {
2128+
new AnalysisException(
2129+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH",
2130+
messageParameters = Map(
2131+
"tableName" -> toSQLId(tableName),
2132+
"capturedTableId" -> capturedTableId,
2133+
"currentTableId" -> currentTableId))
2134+
}
2135+
2136+
def columnsChangedAfterAnalysis(
2137+
tableName: String,
2138+
errors: Seq[String]): Throwable = {
2139+
new AnalysisException(
2140+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
2141+
messageParameters = Map(
2142+
"tableName" -> toSQLId(tableName),
2143+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
2144+
}
2145+
2146+
def metadataColumnsChangedAfterAnalysis(
2147+
tableName: String,
2148+
errors: Seq[String]): Throwable = {
2149+
new AnalysisException(
2150+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.METADATA_COLUMNS_MISMATCH",
2151+
messageParameters = Map(
2152+
"tableName" -> toSQLId(tableName),
2153+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
2154+
}
2155+
21242156
def numberOfPartitionsNotAllowedWithUnspecifiedDistributionError(): Throwable = {
21252157
new AnalysisException(
21262158
errorClass = "INVALID_WRITE_DISTRIBUTION.PARTITION_NUM_WITH_UNSPECIFIED_DISTRIBUTION",

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri
2323
import org.apache.spark.sql.catalyst.plans.QueryPlan
2424
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
2525
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
26-
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, truncatedString, CharVarcharUtils}
27-
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability}
26+
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
27+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog, V2TableUtil}
28+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
2829
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
2930
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
3031
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -57,9 +58,8 @@ abstract class DataSourceV2RelationBase(
5758
}
5859

5960
override def name: String = {
60-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
6161
(catalog, identifier) match {
62-
case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}"
62+
case (Some(cat), Some(ident)) => V2TableUtil.toQualifiedName(cat, ident)
6363
case _ => table.name()
6464
}
6565
}
@@ -259,10 +259,10 @@ object ExtractV2Table {
259259
}
260260

261261
object ExtractV2CatalogAndIdentifier {
262-
def unapply(relation: DataSourceV2Relation): Option[(CatalogPlugin, Identifier)] = {
262+
def unapply(relation: DataSourceV2Relation): Option[(TableCatalog, Identifier)] = {
263263
relation match {
264264
case DataSourceV2Relation(_, _, Some(catalog), Some(identifier), _, _) =>
265-
Some((catalog, identifier))
265+
Some((catalog.asTableCatalog, identifier))
266266
case _ =>
267267
None
268268
}

0 commit comments

Comments
 (0)