Skip to content

Commit f1d8c9c

Browse files
aokolnychyihuangxiaopingRD
authored andcommitted
[SPARK-53924] Reload DSv2 tables in views created using plans on each view access
### What changes were proposed in this pull request? This PR makes Spark reload DSv2 tables in views created using plans on each view access. ### Why are the changes needed? The current problem is that the view definition in the session catalog captures the analyzed plan that references `Table` (that is supposed to pin the version). If a connector doesn’t have an internal cache and produces a new `Table` object on each load, the table referenced in the view will become orphan and there will be no way to refresh it unless that `Table` instance auto refreshes on each scan (super dangerous). ### Does this PR introduce _any_ user-facing change? Yes, but it restores the correct behavior without requiring hacks in connectors. ### How was this patch tested? This PR comes with tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52876 from aokolnychyi/spark-53924. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 33bdbb0 commit f1d8c9c

File tree

10 files changed

+564
-6
lines changed

10 files changed

+564
-6
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2128,6 +2128,15 @@
21282128
],
21292129
"sqlState" : "42000"
21302130
},
2131+
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION" : {
2132+
"message" : [
2133+
"View <viewName> plan references table <tableName> whose <colType> columns changed since the view plan was initially captured.",
2134+
"Column changes:",
2135+
"<errors>",
2136+
"This indicates the table has evolved and the view based on the plan must be recreated."
2137+
],
2138+
"sqlState" : "51024"
2139+
},
21312140
"INCOMPATIBLE_COLUMN_TYPE" : {
21322141
"message" : [
21332142
"<operator> can only be performed on tables with compatible column types. The <columnOrdinalNumber> column of the <tableOrdinalNumber> table is <dataType1> type which is not compatible with <dataType2> at the same column of the first table.<hint>."

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,18 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
11851185
}
11861186
}
11871187

1188+
private def resolveAsV2Relation(plan: LogicalPlan): Option[DataSourceV2Relation] = {
1189+
plan match {
1190+
case ref: V2TableReference =>
1191+
EliminateSubqueryAliases(relationResolution.resolveReference(ref)) match {
1192+
case r: DataSourceV2Relation => Some(r)
1193+
case _ => None
1194+
}
1195+
case r: DataSourceV2Relation => Some(r)
1196+
case _ => None
1197+
}
1198+
}
1199+
11881200
def apply(plan: LogicalPlan)
11891201
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
11901202
case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
@@ -1210,20 +1222,24 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
12101222
resolveRelation(u).map(unwrapRelationPlan).map {
12111223
case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError(
12121224
v.desc.identifier, write)
1213-
case r: DataSourceV2Relation => write.withNewTable(r)
12141225
case u: UnresolvedCatalogRelation =>
12151226
throw QueryCompilationErrors.writeIntoV1TableNotAllowedError(
12161227
u.tableMeta.identifier, write)
1217-
case other =>
1218-
throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(
1219-
u.multipartIdentifier.quoted)
1228+
case plan =>
1229+
resolveAsV2Relation(plan).map(write.withNewTable).getOrElse {
1230+
throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(
1231+
u.multipartIdentifier.quoted)
1232+
}
12201233
}.getOrElse(write)
12211234
case _ => write
12221235
}
12231236

12241237
case u: UnresolvedRelation =>
12251238
resolveRelation(u).map(resolveViews(_, u.options)).getOrElse(u)
12261239

1240+
case r: V2TableReference =>
1241+
relationResolution.resolveReference(r)
1242+
12271243
case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
12281244
if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) =>
12291245
val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,45 @@ class RelationResolution(override val catalogManager: CatalogManager)
225225
}
226226
}
227227

228+
def resolveReference(ref: V2TableReference): LogicalPlan = {
229+
val relation = getOrLoadRelation(ref)
230+
val planId = ref.getTagValue(LogicalPlan.PLAN_ID_TAG)
231+
cloneWithPlanId(relation, planId)
232+
}
233+
234+
private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = {
235+
val key = toCacheKey(ref.catalog, ref.identifier)
236+
relationCache.get(key) match {
237+
case Some(cached) =>
238+
adaptCachedRelation(cached, ref)
239+
case None =>
240+
val relation = loadRelation(ref)
241+
relationCache.update(key, relation)
242+
relation
243+
}
244+
}
245+
246+
private def loadRelation(ref: V2TableReference): LogicalPlan = {
247+
val table = ref.catalog.loadTable(ref.identifier)
248+
V2TableReferenceUtils.validateLoadedTable(table, ref)
249+
val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
250+
SubqueryAlias(tableName, ref.toRelation(table))
251+
}
252+
253+
private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): LogicalPlan = {
254+
cached transform {
255+
case r: DataSourceV2Relation if matchesReference(r, ref) =>
256+
V2TableReferenceUtils.validateLoadedTable(r.table, ref)
257+
r.copy(output = ref.output, options = ref.options)
258+
}
259+
}
260+
261+
private def matchesReference(
262+
relation: DataSourceV2Relation,
263+
ref: V2TableReference): Boolean = {
264+
relation.catalog.contains(ref.catalog) && relation.identifier.contains(ref.identifier)
265+
}
266+
228267
private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty
229268

230269
private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.SparkException
21+
import org.apache.spark.sql.catalyst.SQLConfHelper
22+
import org.apache.spark.sql.catalyst.analysis.V2TableReference.Context
23+
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TableInfo
24+
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TemporaryViewContext
25+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
26+
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
27+
import org.apache.spark.sql.catalyst.plans.logical.Statistics
28+
import org.apache.spark.sql.catalyst.util.truncatedString
29+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
30+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
31+
import org.apache.spark.sql.connector.catalog.Column
32+
import org.apache.spark.sql.connector.catalog.Identifier
33+
import org.apache.spark.sql.connector.catalog.MetadataColumn
34+
import org.apache.spark.sql.connector.catalog.Table
35+
import org.apache.spark.sql.connector.catalog.TableCatalog
36+
import org.apache.spark.sql.connector.catalog.V2TableUtil
37+
import org.apache.spark.sql.errors.QueryCompilationErrors
38+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
39+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
40+
import org.apache.spark.util.ArrayImplicits._
41+
42+
/**
43+
* A reference to a V2 table.
44+
*
45+
* References are placeholders for the latest table metadata and are replaced with actual table
46+
* versions during analysis, allowing Spark to reload tables with up-to-date metadata. The newly
47+
* loaded table metadata is validated against the original metadata depending on the context.
48+
* For instance, temporary views with fully resolved logical plans don't allow schema changes
49+
* in underlying tables.
50+
*/
51+
private[sql] case class V2TableReference private(
52+
catalog: TableCatalog,
53+
identifier: Identifier,
54+
options: CaseInsensitiveStringMap,
55+
info: TableInfo,
56+
output: Seq[AttributeReference],
57+
context: Context)
58+
extends LeafNode with MultiInstanceRelation with NamedRelation {
59+
60+
override def name: String = V2TableUtil.toQualifiedName(catalog, identifier)
61+
62+
override def newInstance(): V2TableReference = {
63+
copy(output = output.map(_.newInstance()))
64+
}
65+
66+
override def computeStats(): Statistics = Statistics.DUMMY
67+
68+
override def simpleString(maxFields: Int): String = {
69+
val outputString = truncatedString(output, "[", ", ", "]", maxFields)
70+
s"TableReference$outputString $name"
71+
}
72+
73+
def toRelation(table: Table): DataSourceV2Relation = {
74+
DataSourceV2Relation(table, output, Some(catalog), Some(identifier), options)
75+
}
76+
}
77+
78+
private[sql] object V2TableReference {
79+
80+
case class TableInfo(
81+
columns: Seq[Column],
82+
metadataColumns: Seq[MetadataColumn])
83+
84+
sealed trait Context
85+
case class TemporaryViewContext(viewName: Seq[String]) extends Context
86+
87+
def createForTempView(relation: DataSourceV2Relation, viewName: Seq[String]): V2TableReference = {
88+
create(relation, TemporaryViewContext(viewName))
89+
}
90+
91+
private def create(relation: DataSourceV2Relation, context: Context): V2TableReference = {
92+
val ref = V2TableReference(
93+
relation.catalog.get.asTableCatalog,
94+
relation.identifier.get,
95+
relation.options,
96+
TableInfo(
97+
columns = relation.table.columns.toImmutableArraySeq,
98+
metadataColumns = V2TableUtil.extractMetadataColumns(relation)),
99+
relation.output,
100+
context)
101+
ref.copyTagsFrom(relation)
102+
ref
103+
}
104+
}
105+
106+
private[sql] object V2TableReferenceUtils extends SQLConfHelper {
107+
108+
def validateLoadedTable(table: Table, ref: V2TableReference): Unit = {
109+
ref.context match {
110+
case ctx: TemporaryViewContext =>
111+
validateLoadedTableInTempView(table, ref, ctx)
112+
case ctx =>
113+
throw SparkException.internalError(s"Unknown table ref context: ${ctx.getClass.getName}")
114+
}
115+
}
116+
117+
private def validateLoadedTableInTempView(
118+
table: Table,
119+
ref: V2TableReference,
120+
ctx: TemporaryViewContext): Unit = {
121+
val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
122+
123+
val dataErrors = V2TableUtil.validateCapturedColumns(table, ref.info.columns)
124+
if (dataErrors.nonEmpty) {
125+
throw QueryCompilationErrors.columnsChangedAfterViewWithPlanCreation(
126+
ctx.viewName,
127+
tableName,
128+
dataErrors)
129+
}
130+
131+
val metaErrors = V2TableUtil.validateCapturedMetadataColumns(table, ref.info.metadataColumns)
132+
if (metaErrors.nonEmpty) {
133+
throw QueryCompilationErrors.metadataColumnsChangedAfterViewWithPlanCreation(
134+
ctx.viewName,
135+
tableName,
136+
metaErrors)
137+
}
138+
}
139+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private[sql] object V2TableUtil extends SQLConfHelper {
9797
}
9898

9999
// extracts original column info for all metadata attributes in relation
100-
private def extractMetadataColumns(relation: DataSourceV2Relation): Seq[MetadataColumn] = {
100+
def extractMetadataColumns(relation: DataSourceV2Relation): Seq[MetadataColumn] = {
101101
val metaAttrs = relation.output.filter(_.isMetadataCol)
102102
if (metaAttrs.nonEmpty) {
103103
val metaCols = metadataColumns(relation.table)

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4466,4 +4466,30 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
44664466
)
44674467
)
44684468
}
4469+
4470+
def columnsChangedAfterViewWithPlanCreation(
4471+
viewName: Seq[String],
4472+
tableName: Seq[String],
4473+
errors: Seq[String]): Throwable = {
4474+
new AnalysisException(
4475+
errorClass = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
4476+
messageParameters = Map(
4477+
"viewName" -> toSQLId(viewName),
4478+
"tableName" -> toSQLId(tableName),
4479+
"colType" -> "data",
4480+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
4481+
}
4482+
4483+
def metadataColumnsChangedAfterViewWithPlanCreation(
4484+
viewName: Seq[String],
4485+
tableName: Seq[String],
4486+
errors: Seq[String]): Throwable = {
4487+
new AnalysisException(
4488+
errorClass = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
4489+
messageParameters = Map(
4490+
"viewName" -> toSQLId(viewName),
4491+
"tableName" -> toSQLId(tableName),
4492+
"colType" -> "metadata",
4493+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
4494+
}
44694495
}

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2222
import org.apache.spark.internal.{Logging, MessageWithContext}
2323
import org.apache.spark.internal.LogKeys._
2424
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
25+
import org.apache.spark.sql.catalyst.analysis.V2TableReference
2526
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
2627
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
2728
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
@@ -250,6 +251,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
250251
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
251252
isSameName(nameInCache) && (includeTimeTravel || timeTravelSpec.isEmpty)
252253

254+
case r: V2TableReference =>
255+
isSameName(r.identifier.toQualifiedNameParts(r.catalog))
256+
253257
case v: View =>
254258
isSameName(v.desc.identifier.nameParts)
255259

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{Row, SparkSession}
2828
import org.apache.spark.sql.catalyst.{CapturesConfig, SQLConfHelper, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, SchemaEvolution, SchemaUnsupported, ViewSchemaMode, ViewType}
30+
import org.apache.spark.sql.catalyst.analysis.V2TableReference
3031
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation}
3132
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, VariableReference}
3233
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CreateTempView, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE}
3334
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
3435
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
3536
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
3637
import org.apache.spark.sql.errors.QueryCompilationErrors
38+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3739
import org.apache.spark.sql.internal.StaticSQLConf
3840
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
3941
import org.apache.spark.sql.util.SchemaUtils
@@ -733,7 +735,17 @@ object ViewHelper extends SQLConfHelper with Logging with CapturesConfig {
733735
} else {
734736
TemporaryViewRelation(
735737
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan, defaultCollation),
736-
Some(aliasedPlan))
738+
Some(prepareTemporaryViewPlan(name, aliasedPlan)))
739+
}
740+
}
741+
742+
private def prepareTemporaryViewPlan(
743+
viewName: TableIdentifier,
744+
plan: LogicalPlan): LogicalPlan = {
745+
plan transform {
746+
case r: DataSourceV2Relation
747+
if r.catalog.isDefined && r.identifier.isDefined && r.timeTravelSpec.isEmpty =>
748+
V2TableReference.createForTempView(r, viewName.nameParts)
737749
}
738750
}
739751

0 commit comments

Comments
 (0)