Skip to content

Commit 92cea9b

Browse files
committed
Review feedback
1 parent a9251b6 commit 92cea9b

File tree

6 files changed

+52
-27
lines changed

6 files changed

+52
-27
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1224,7 +1224,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
12241224
case u: UnresolvedRelation =>
12251225
resolveRelation(u).map(resolveViews(_, u.options)).getOrElse(u)
12261226

1227-
case r: TableReference =>
1227+
case r: V2TableReference =>
12281228
relationResolution.resolveReference(r)
12291229

12301230
case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -225,13 +225,13 @@ class RelationResolution(override val catalogManager: CatalogManager)
225225
}
226226
}
227227

228-
def resolveReference(ref: TableReference): LogicalPlan = {
228+
def resolveReference(ref: V2TableReference): LogicalPlan = {
229229
val relation = getOrLoadRelation(ref)
230230
val planId = ref.getTagValue(LogicalPlan.PLAN_ID_TAG)
231231
cloneWithPlanId(relation, planId)
232232
}
233233

234-
private def getOrLoadRelation(ref: TableReference): LogicalPlan = {
234+
private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = {
235235
val key = toCacheKey(ref.catalog, ref.identifier)
236236
relationCache.get(key) match {
237237
case Some(cached) =>
@@ -243,14 +243,14 @@ class RelationResolution(override val catalogManager: CatalogManager)
243243
}
244244
}
245245

246-
private def loadRelation(ref: TableReference): LogicalPlan = {
246+
private def loadRelation(ref: V2TableReference): LogicalPlan = {
247247
val table = ref.catalog.loadTable(ref.identifier)
248-
TableReferenceUtils.validateLoadedTable(table, ref)
248+
V2TableReferenceUtils.validateLoadedTable(table, ref)
249249
val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
250250
SubqueryAlias(tableName, ref.toRelation(table))
251251
}
252252

253-
private def adaptCachedRelation(cached: LogicalPlan, ref: TableReference): LogicalPlan = {
253+
private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): LogicalPlan = {
254254
cached transform {
255255
case r: DataSourceV2Relation if matchesReference(r, ref) =>
256256
r.copy(output = ref.output, options = ref.options)
@@ -259,7 +259,7 @@ class RelationResolution(override val catalogManager: CatalogManager)
259259

260260
private def matchesReference(
261261
relation: DataSourceV2Relation,
262-
ref: TableReference): Boolean = {
262+
ref: V2TableReference): Boolean = {
263263
relation.catalog.contains(ref.catalog) && relation.identifier.contains(ref.identifier)
264264
}
265265

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableReference.scala renamed to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.SparkException
2121
import org.apache.spark.sql.catalyst.SQLConfHelper
22-
import org.apache.spark.sql.catalyst.analysis.TableReference.Context
23-
import org.apache.spark.sql.catalyst.analysis.TableReference.TableInfo
24-
import org.apache.spark.sql.catalyst.analysis.TableReference.TemporaryViewContext
22+
import org.apache.spark.sql.catalyst.analysis.V2TableReference.Context
23+
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TableInfo
24+
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TemporaryViewContext
2525
import org.apache.spark.sql.catalyst.expressions.AttributeReference
2626
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
2727
import org.apache.spark.sql.catalyst.plans.logical.Statistics
@@ -39,7 +39,16 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3939
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4040
import org.apache.spark.util.ArrayImplicits._
4141

42-
case class TableReference private (
42+
/**
43+
* A reference to a V2 table.
44+
*
45+
* References are placeholders for the latest table metadata and are replaced with actual table
46+
* versions during analysis, allowing Spark to reload tables with up-to-date metadata. The newly
47+
* loaded table metadata is validated against the original metadata depending on the context.
48+
* For instance, temporary views with fully resolved logical plans don't allow schema changes
49+
* in underlying tables.
50+
*/
51+
case class V2TableReference private(
4352
catalog: TableCatalog,
4453
identifier: Identifier,
4554
options: CaseInsensitiveStringMap,
@@ -50,7 +59,7 @@ case class TableReference private (
5059

5160
override def name: String = V2TableUtil.toQualifiedName(catalog, identifier)
5261

53-
override def newInstance(): TableReference = {
62+
override def newInstance(): V2TableReference = {
5463
copy(output = output.map(_.newInstance()))
5564
}
5665

@@ -66,7 +75,7 @@ case class TableReference private (
6675
}
6776
}
6877

69-
object TableReference {
78+
object V2TableReference {
7079

7180
case class TableInfo(
7281
columns: Seq[Column],
@@ -75,12 +84,12 @@ object TableReference {
7584
sealed trait Context
7685
case class TemporaryViewContext(viewName: Seq[String]) extends Context
7786

78-
def createForTempView(relation: DataSourceV2Relation, viewName: Seq[String]): TableReference = {
87+
def createForTempView(relation: DataSourceV2Relation, viewName: Seq[String]): V2TableReference = {
7988
create(relation, TemporaryViewContext(viewName))
8089
}
8190

82-
private def create(relation: DataSourceV2Relation, context: Context): TableReference = {
83-
val ref = TableReference(
91+
private def create(relation: DataSourceV2Relation, context: Context): V2TableReference = {
92+
val ref = V2TableReference(
8493
relation.catalog.get.asTableCatalog,
8594
relation.identifier.get,
8695
relation.options,
@@ -94,9 +103,9 @@ object TableReference {
94103
}
95104
}
96105

97-
object TableReferenceUtils extends SQLConfHelper {
106+
object V2TableReferenceUtils extends SQLConfHelper {
98107

99-
def validateLoadedTable(table: Table, ref: TableReference): Unit = {
108+
def validateLoadedTable(table: Table, ref: V2TableReference): Unit = {
100109
ref.context match {
101110
case ctx: TemporaryViewContext =>
102111
validateLoadedTableInTempView(table, ref, ctx)
@@ -107,7 +116,7 @@ object TableReferenceUtils extends SQLConfHelper {
107116

108117
private def validateLoadedTableInTempView(
109118
table: Table,
110-
ref: TableReference,
119+
ref: V2TableReference,
111120
ctx: TemporaryViewContext): Unit = {
112121
val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
113122

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2222
import org.apache.spark.internal.{Logging, MessageWithContext}
2323
import org.apache.spark.internal.LogKeys._
2424
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
25-
import org.apache.spark.sql.catalyst.analysis.TableReference
25+
import org.apache.spark.sql.catalyst.analysis.V2TableReference
2626
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
2727
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
2828
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
@@ -251,7 +251,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
251251
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
252252
isSameName(nameInCache) && (includeTimeTravel || timeTravelSpec.isEmpty)
253253

254-
case r: TableReference =>
254+
case r: V2TableReference =>
255255
isSameName(r.identifier.toQualifiedNameParts(r.catalog))
256256

257257
case v: View =>

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{Row, SparkSession}
2828
import org.apache.spark.sql.catalyst.{CapturesConfig, SQLConfHelper, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, SchemaEvolution, SchemaUnsupported, ViewSchemaMode, ViewType}
30-
import org.apache.spark.sql.catalyst.analysis.TableReference
30+
import org.apache.spark.sql.catalyst.analysis.V2TableReference
3131
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation}
3232
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, VariableReference}
3333
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CreateTempView, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE}
@@ -745,7 +745,7 @@ object ViewHelper extends SQLConfHelper with Logging with CapturesConfig {
745745
plan transform {
746746
case r: DataSourceV2Relation
747747
if r.catalog.isDefined && r.identifier.isDefined && r.timeTravelSpec.isEmpty =>
748-
TableReference.createForTempView(r, viewName.nameParts)
748+
V2TableReference.createForTempView(r, viewName.nameParts)
749749
}
750750
}
751751

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2457,10 +2457,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
24572457
}
24582458
}
24592459

2460-
test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp views with plans") {
2460+
test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp view (plan)") {
2461+
checkInsertInvalidatesCacheOfSQLTempView(storePlan = true)
2462+
}
2463+
2464+
test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp view (text)") {
2465+
checkInsertInvalidatesCacheOfSQLTempView(storePlan = false)
2466+
}
2467+
2468+
private def checkInsertInvalidatesCacheOfSQLTempView(storePlan: Boolean): Unit = {
24612469
val t = "testcat.tbl"
24622470
withTable(t, "v") {
2463-
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
2471+
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storePlan.toString) {
24642472
sql(s"CREATE TABLE $t (id int, data string) USING foo")
24652473
sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
24662474

@@ -2487,10 +2495,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
24872495
}
24882496
}
24892497

2490-
test("SPARK-53924: uncache DSv2 table using SQL uncaches SQL temp views with plans") {
2498+
test("SPARK-53924: uncache DSv2 table uncaches SQL temp views (plan)") {
2499+
checkUncacheTableUncachesSQLTempView(storePlan = true)
2500+
}
2501+
2502+
test("SPARK-53924: uncache DSv2 table uncaches SQL temp views (text)") {
2503+
checkUncacheTableUncachesSQLTempView(storePlan = false)
2504+
}
2505+
2506+
private def checkUncacheTableUncachesSQLTempView(storePlan: Boolean): Unit = {
24912507
val t = "testcat.tbl"
24922508
withTable(t, "v") {
2493-
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
2509+
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storePlan.toString) {
24942510
sql(s"CREATE TABLE $t (id int, data string) USING foo")
24952511
sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
24962512

0 commit comments

Comments
 (0)