Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1069,27 +1069,30 @@ class Analyzer(
case view: View if !view.child.resolved =>
ViewResolution
.resolve(view, options, resolveChild = executeSameContext, checkAnalysis = checkAnalysis)
// V2TableReference is a placeholder for DSv2 tables that needs to be resolved to
// DataSourceV2Relation on each view access. Only dataframe temp view may contain it
// as it stores resolved plans directly.
case view: View if view.isTempViewStoringAnalyzedPlan =>
view.copy(child = resolveTableReferences(view.child))
case p @ SubqueryAlias(_, view: View) =>
p.copy(child = resolveViews(view, options))
case _ => plan
}

// Unwrap temp views storing analyzed plans and resolve V2TableReference nodes in the child.
private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
EliminateSubqueryAliases(plan) match {
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => resolveTableReferences(v.child)
case other => other
}
}

private def resolveAsV2Relation(plan: LogicalPlan): Option[DataSourceV2Relation] = {
plan match {
case ref: V2TableReference =>
EliminateSubqueryAliases(relationResolution.resolveReference(ref)) match {
case r: DataSourceV2Relation => Some(r)
case _ => None
}
case r: DataSourceV2Relation => Some(r)
case _ => None
// Resolve V2TableReference nodes in a plan. V2TableReference is only created for temp views
// (via V2TableReference.createForTempView), so we only need to resolve it when returning
// the plan of temp views (in resolveViews and unwrapRelationPlan).
private def resolveTableReferences(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsUp {
case r: V2TableReference => relationResolution.resolveReference(r)
}
}

Expand All @@ -1099,14 +1102,13 @@ class Analyzer(
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
resolveRelation(u).getOrElse(u)
case r: V2TableReference =>
relationResolution.resolveReference(r)
case other => other
}

// Inserting into a file-based temporary view is allowed.
// (e.g., spark.read.parquet("path").createOrReplaceTempView("t").
// Thus, we need to look at the raw plan if `relation` is a temporary view.
// unwrapRelationPlan also resolves V2TableReference nodes in temp view plans.
unwrapRelationPlan(relation) match {
case v: View =>
throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table)
Expand All @@ -1123,11 +1125,10 @@ class Analyzer(
case u: UnresolvedCatalogRelation =>
throw QueryCompilationErrors.writeIntoV1TableNotAllowedError(
u.tableMeta.identifier, write)
case plan =>
resolveAsV2Relation(plan).map(write.withNewTable).getOrElse {
throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(
u.multipartIdentifier.quoted)
}
case r: DataSourceV2Relation => write.withNewTable(r)
case _ =>
throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(
u.multipartIdentifier.quoted)
}.getOrElse(write)
case _ => write
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
context = u.origin.getQueryContext,
summary = u.origin.context.summary)

case r: V2TableReference =>
throw SparkException.internalError(
s"V2TableReference should be resolved during analysis: ${r.name}")

case u: UnresolvedInlineTable if unresolvedInlineTableContainsScalarSubquery(u) =>
throw QueryCompilationErrors.inlineTableContainsScalarSubquery(u)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,7 @@ class RelationResolution(
private def loadRelation(ref: V2TableReference): LogicalPlan = {
val table = ref.catalog.loadTable(ref.identifier)
V2TableReferenceUtils.validateLoadedTable(table, ref)
val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
SubqueryAlias(tableName, ref.toRelation(table))
ref.toRelation(table)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2TableReference was created from v2 relation, and should be restored back to v2 relation, without SubqueryAlias, so that it's a round trip.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.internal.{Logging, MessageWithContext}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.analysis.V2TableReference
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
Expand Down Expand Up @@ -261,9 +260,6 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
isSameName(name, nameInCache, resolver) && (includeTimeTravel || timeTravelSpec.isEmpty)

case r: V2TableReference =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only cache resolved plans, and we only perform cache lookup for resolved plans, so V2TableReference should never exist in the cache manager.

isSameName(name, r.identifier.toQualifiedNameParts(r.catalog), resolver)

case v: View =>
isSameName(name, v.desc.identifier.nameParts, resolver)

Expand Down