Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}

import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.internal.config.ConfigBindingPolicy
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
Expand Down Expand Up @@ -139,10 +139,16 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo
* even if a temp view `t` has been created.
* @param outerPlan The query plan from the outer query that can be used to resolve star
* expressions in a subquery.
* @param resolutionPathEntries When resolving a view body, the ordered path for unqualified
* relation names. Stays [[None]] in this PR; population from the
* frozen path stored in view metadata is wired in a follow-up.
* Outside views: compute from session
* [[CatalogManager.sqlResolutionPathEntries]].
Comment thread
srielau marked this conversation as resolved.
*/
case class AnalysisContext(
isDefault: Boolean = false,
catalogAndNamespace: Seq[String] = Nil,
resolutionPathEntries: Option[Seq[Seq[String]]] = None,
nestedViewDepth: Int = 0,
maxNestedViewDepth: Int = -1,
relationCache: mutable.Map[(Seq[String], Option[TimeTravelSpec]), LogicalPlan] =
Expand Down Expand Up @@ -198,7 +204,6 @@ object AnalysisContext {
def withAnalysisContext[A](viewDesc: CatalogTable)(f: => A): A = {
val originContext = value.get()
val maxNestedViewDepth = if (originContext.maxNestedViewDepth == -1) {
// Here we start to resolve views, get `maxNestedViewDepth` from configs.
SQLConf.get.maxNestedViewDepth
} else {
originContext.maxNestedViewDepth
Expand Down Expand Up @@ -290,13 +295,20 @@ object Analyzer {
*/
class Analyzer(
override val catalogManager: CatalogManager,
private[sql] val sharedRelationCache: RelationCache = RelationCache.empty)
private[sql] val sharedRelationCache: RelationCache = RelationCache.empty,
private[sql] val sessionConf: Option[SQLConf] = None)
extends RuleExecutor[LogicalPlan]
with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper {

/** Conf to use for path-based resolution and error messages; uses session conf when available. */
private[sql] def resolutionConf: SQLConf = sessionConf.getOrElse(SQLConf.get)


private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
private val relationResolution = new RelationResolution(catalogManager, sharedRelationCache)
private val functionResolution = new FunctionResolution(catalogManager, relationResolution)
private val relationResolution =
new RelationResolution(catalogManager, sharedRelationCache, sessionConf)
private val functionResolution = new FunctionResolution(catalogManager, relationResolution,
resolutionConf)

override protected def validatePlanChanges(
previousPlan: LogicalPlan,
Expand All @@ -317,19 +329,27 @@ class Analyzer(
if (plan.analyzed) {
plan
} else {
def runAnalysis(): LogicalPlan = HybridAnalyzer.fromLegacyAnalyzer(
legacyAnalyzer = this, tracker = tracker).apply(plan)
if (AnalysisContext.get.isDefault) {
AnalysisContext.reset()
try {
AnalysisHelper.markInAnalyzer {
HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker = tracker).apply(plan)
sessionConf match {
case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() }
case None => runAnalysis()
}
}
} finally {
AnalysisContext.reset()
}
} else {
AnalysisContext.withNewAnalysisContext {
AnalysisHelper.markInAnalyzer {
HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker = tracker).apply(plan)
sessionConf match {
case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() }
case None => runAnalysis()
}
}
}
}
Expand All @@ -342,7 +362,13 @@ class Analyzer(
}
}

private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan)
private def executeSameContext(plan: LogicalPlan): LogicalPlan = sessionConf match {
// Respect explicit nested SQLConf overrides (e.g. persisted SQL UDF/view configs).
// Otherwise, run analysis with the captured session conf for analyzer isolation.
case Some(c) if SQLConf.get ne c => super.execute(plan)
case Some(c) => SQLConf.withExistingConf(c) { super.execute(plan) }
case None => super.execute(plan)
}

def resolver: Resolver = conf.resolver

Expand Down Expand Up @@ -981,7 +1007,8 @@ class Analyzer(
// This is done by keeping the catalog and namespace in `AnalysisContext`, and analyzer will
// look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name.
// If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names
// with it, instead of current catalog and namespace.
// with it, instead of current catalog and namespace. Unqualified relation PATH will be
// snapshotted in `AnalysisContext.resolutionPathEntries` in a follow-up PR.
private def resolveViews(
plan: LogicalPlan,
options: CaseInsensitiveStringMap): LogicalPlan = plan match {
Expand Down Expand Up @@ -1090,7 +1117,7 @@ class Analyzer(
case other => other
}.getOrElse(u)

case u @ UnresolvedTableOrView(identifier, cmd, allowTempView) =>
case u @ UnresolvedTableOrView(identifier, cmd, allowTempView, _) =>
lookupTableOrView(identifier).map {
case _: ResolvedTempView if !allowTempView =>
throw QueryCompilationErrors.expectPermanentViewNotTempViewError(
Expand Down Expand Up @@ -2078,9 +2105,9 @@ class Analyzer(
throw QueryCompilationErrors.notAScalarFunctionError(nameParts.mkString("."), f)

case FunctionType.NotFound =>
val catalogPath =
catalogManager.currentCatalog.name +: catalogManager.currentNamespace
val searchPath = SQLConf.get.resolutionSearchPath(catalogPath.toSeq)
val catalogPath = catalogPathForError
val searchPath = catalogManager
.sqlResolutionPathEntries(catalogPath.head, catalogPath.tail.toSeq)
.map(_.quoted)
throw QueryCompilationErrors.unresolvedRoutineError(
nameParts,
Expand Down Expand Up @@ -2278,20 +2305,8 @@ class Analyzer(
object ResolveProcedures extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_PROCEDURE), ruleId) {
case UnresolvedProcedure(CatalogAndIdentifier(catalog, ident)) =>
val procedureCatalog = catalog.asProcedureCatalog
val procedure = load(procedureCatalog, ident)
ResolvedProcedure(procedureCatalog, ident, procedure)
}

private def load(catalog: ProcedureCatalog, ident: Identifier): UnboundProcedure = {
try {
catalog.loadProcedure(ident)
} catch {
case e: Exception if !e.isInstanceOf[SparkThrowable] =>
val nameParts = catalog.name +: ident.asMultipartIdentifier
throw QueryCompilationErrors.failedToLoadRoutineError(nameParts, e)
}
case u: UnresolvedProcedure =>
functionResolution.resolveProcedure(u)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.catalyst.analysis

import java.util.Locale

import scala.collection.mutable

import org.apache.spark.{SparkException, SparkThrowable}
Expand Down Expand Up @@ -45,6 +43,8 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString

protected def isView(nameParts: Seq[String]): Boolean

protected def conf: org.apache.spark.sql.internal.SQLConf

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

/**
Expand Down Expand Up @@ -73,34 +73,47 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
* Contains system.session and the current catalog namespace only. Not from SQLConf.
*/
private def ddlSearchPathForError(catalogPath: Seq[String]): Seq[String] = {
Seq(toSQLId(Seq("system", "session")), toSQLId(catalogPath))
val sessionPath = Seq(
CatalogManager.SYSTEM_CATALOG_NAME,
CatalogManager.SESSION_NAMESPACE)
Seq(toSQLId(sessionPath), toSQLId(catalogPath))
}

/**
* `SQLConf.resolutionSearchPath` entries formatted with [[toSQLId]] for TABLE_OR_VIEW_NOT_FOUND.
* Same ordering as relation resolution and routine resolution search paths.
* Formats [[CatalogManager.sqlResolutionPathEntries]] with [[toSQLId]]
* for TABLE_OR_VIEW_NOT_FOUND error messages.
*/
private def fullSearchPathForError(catalogPath: Seq[String]): Seq[String] = {
SQLConf.get.resolutionSearchPath(catalogPath).map(toSQLId)
val catalog = catalogPath.head
val ns = catalogPath.tail.toSeq
catalogManager.sqlResolutionPathEntries(catalog, ns).map(toSQLId)
}

/** Current catalog name and namespace as a path, used when computing search path for errors. */
private def catalogPathForError: Seq[String] = {
(currentCatalog.name +: catalogManager.currentNamespace).toSeq
/**
* Catalog + namespace path for error messages. When resolving inside a view body,
* uses the view's defining catalog/namespace from AnalysisContext so the error
* reflects where the view was trying to resolve.
*/
protected final def catalogPathForError: Seq[String] = {
val ctx = AnalysisContext.get.catalogAndNamespace
if (ctx.nonEmpty) ctx
else (currentCatalog.name +: catalogManager.currentNamespace).toSeq
}

/**
* Search path for TABLE_OR_VIEW_NOT_FOUND when the command targets only temp views
* (e.g. DROP TEMPORARY VIEW). Contains system.session only.
*/
private def tempViewOnlySearchPathForError(): Seq[String] = {
Seq(toSQLId(Seq("system", "session")))
Seq(toSQLId(Seq(
CatalogManager.SYSTEM_CATALOG_NAME,
CatalogManager.SESSION_NAMESPACE)))
}

/**
* Search path for TABLE_OR_VIEW_NOT_FOUND on unresolved relations in SELECT/DML/INSERT/time
* travel. Three-part `system.session.name` resolves only to session temp views, so only that
* scope is listed. Other names use [[fullSearchPathForError]] (resolutionSearchPath order).
* scope is listed. Other names use [[fullSearchPathForError]] (sqlResolutionPathEntries order).
*/
private def searchPathForUnresolvedRelation(multipartIdentifier: Seq[String]): Seq[String] = {
if (CatalogManager.isFullyQualifiedSystemSessionViewName(multipartIdentifier)) {
Expand Down Expand Up @@ -381,17 +394,15 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString

case u: UnresolvedTableOrView =>
val catalogPath = catalogPathForError
val searchPath = if (u.commandName.toUpperCase(Locale.ROOT).contains("TEMPORARY VIEW")) {
tempViewOnlySearchPathForError()
} else if (u.commandName.toUpperCase(Locale.ROOT).startsWith("DESCRIBE") ||
u.commandName.toUpperCase(Locale.ROOT).startsWith("DESC ")) {
if (CatalogManager.isFullyQualifiedSystemSessionViewName(u.multipartIdentifier)) {
tempViewOnlySearchPathForError()
} else {
fullSearchPathForError(catalogPath)
}
} else {
ddlSearchPathForError(catalogPath)
val searchPath = u.tableNotFoundSearchPathMode match {
case UnresolvedTableOrViewSearchPathMode.QueryLike =>
if (CatalogManager.isFullyQualifiedSystemSessionViewName(u.multipartIdentifier)) {
tempViewOnlySearchPathForError()
} else {
fullSearchPathForError(catalogPath)
}
case UnresolvedTableOrViewSearchPathMode.Ddl =>
ddlSearchPathForError(catalogPath)
}
u.tableNotFound(u.multipartIdentifier, searchPath)

Expand All @@ -401,8 +412,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
searchPathForUnresolvedRelation(u.multipartIdentifier))

case u: UnresolvedFunctionName =>
val searchPath =
SQLConf.get.resolutionSearchPath(catalogPathForError).map(_.quoted)
val searchPath = fullSearchPathForError(catalogPathForError)
throw QueryCompilationErrors.unresolvedRoutineError(
u.multipartIdentifier,
searchPath,
Expand Down Expand Up @@ -595,7 +605,9 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
searchPathForUnresolvedRelation(u.multipartIdentifier))

case RelationChanges(u: UnresolvedRelation, _) =>
u.tableNotFound(u.multipartIdentifier)
u.tableNotFound(
u.multipartIdentifier,
searchPathForUnresolvedRelation(u.multipartIdentifier))

case etw: EventTimeWatermark =>
etw.eventTime.dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {

// Resolves `UnresolvedAttribute` to its value.
protected def resolveVariables(e: Expression): Expression = {
val variableResolution = new VariableResolution(catalogManager.tempVariableManager)
val variableResolution =
new VariableResolution(catalogManager.tempVariableManager, catalogManager)

def resolve(nameParts: Seq[String]): Option[Expression] = {
variableResolution.resolveMultipartName(
Expand Down
Loading