diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d6154b1e26664..1ca370119438b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1069,27 +1069,30 @@ class Analyzer( case view: View if !view.child.resolved => ViewResolution .resolve(view, options, resolveChild = executeSameContext, checkAnalysis = checkAnalysis) + // V2TableReference is a placeholder for DSv2 tables that needs to be resolved to + // DataSourceV2Relation on each view access. Only dataframe temp view may contain it + // as it stores resolved plans directly. + case view: View if view.isTempViewStoringAnalyzedPlan => + view.copy(child = resolveTableReferences(view.child)) case p @ SubqueryAlias(_, view: View) => p.copy(child = resolveViews(view, options)) case _ => plan } + // Unwrap temp views storing analyzed plans and resolve V2TableReference nodes in the child. private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = { EliminateSubqueryAliases(plan) match { - case v: View if v.isTempViewStoringAnalyzedPlan => v.child + case v: View if v.isTempViewStoringAnalyzedPlan => resolveTableReferences(v.child) case other => other } } - private def resolveAsV2Relation(plan: LogicalPlan): Option[DataSourceV2Relation] = { - plan match { - case ref: V2TableReference => - EliminateSubqueryAliases(relationResolution.resolveReference(ref)) match { - case r: DataSourceV2Relation => Some(r) - case _ => None - } - case r: DataSourceV2Relation => Some(r) - case _ => None + // Resolve V2TableReference nodes in a plan. V2TableReference is only created for temp views + // (via V2TableReference.createForTempView), so we only need to resolve it when returning + // the plan of temp views (in resolveViews and unwrapRelationPlan). + private def resolveTableReferences(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsUp { + case r: V2TableReference => relationResolution.resolveReference(r) } } @@ -1099,14 +1102,13 @@ class Analyzer( val relation = table match { case u: UnresolvedRelation if !u.isStreaming => resolveRelation(u).getOrElse(u) - case r: V2TableReference => - relationResolution.resolveReference(r) case other => other } // Inserting into a file-based temporary view is allowed. // (e.g., spark.read.parquet("path").createOrReplaceTempView("t"). // Thus, we need to look at the raw plan if `relation` is a temporary view. + // unwrapRelationPlan also resolves V2TableReference nodes in temp view plans. unwrapRelationPlan(relation) match { case v: View => throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table) @@ -1123,11 +1125,10 @@ class Analyzer( case u: UnresolvedCatalogRelation => throw QueryCompilationErrors.writeIntoV1TableNotAllowedError( u.tableMeta.identifier, write) - case plan => - resolveAsV2Relation(plan).map(write.withNewTable).getOrElse { - throw QueryCompilationErrors.writeIntoTempViewNotAllowedError( - u.multipartIdentifier.quoted) - } + case r: DataSourceV2Relation => write.withNewTable(r) + case _ => + throw QueryCompilationErrors.writeIntoTempViewNotAllowedError( + u.multipartIdentifier.quoted) }.getOrElse(write) case _ => write } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index c34ae507e7583..e6266803d0126 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -315,6 +315,10 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString context = u.origin.getQueryContext, summary = u.origin.context.summary) + case r: V2TableReference => + throw SparkException.internalError( + s"V2TableReference should be resolved during analysis: ${r.name}") + case u: UnresolvedInlineTable if unresolvedInlineTableContainsScalarSubquery(u) => throw QueryCompilationErrors.inlineTableContainsScalarSubquery(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 15d5e4874dbb5..6fe386c3c6772 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -274,8 +274,7 @@ class RelationResolution( private def loadRelation(ref: V2TableReference): LogicalPlan = { val table = ref.catalog.loadTable(ref.identifier) V2TableReferenceUtils.validateLoadedTable(table, ref) - val tableName = ref.identifier.toQualifiedNameParts(ref.catalog) - SubqueryAlias(tableName, ref.toRelation(table)) + ref.toRelation(table) } private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): LogicalPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 34e47084f6561..cb6f7e5883210 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -25,7 +25,6 @@ import org.apache.spark.internal.{Logging, MessageWithContext} import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.analysis.V2TableReference import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint @@ -261,9 +260,6 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { val nameInCache = v2Ident.toQualifiedNameParts(catalog) isSameName(name, nameInCache, resolver) && (includeTimeTravel || timeTravelSpec.isEmpty) - case r: V2TableReference => - isSameName(name, r.identifier.toQualifiedNameParts(r.catalog), resolver) - case v: View => isSameName(name, v.desc.identifier.nameParts, resolver)