From 4f300f70868596a887c40a8de117978d86df3e81 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Thu, 23 Apr 2026 16:37:20 -0400 Subject: [PATCH 1/9] [SPARK-56605][SQL] Wire resolution engine to use SQL PATH for table, function, and variable lookup Switch the resolution engine from the legacy single-schema resolutionSearchPath to sqlResolutionPathEntries on CatalogManager, so that SET PATH actually affects how unqualified names are resolved. Key changes: - CatalogManager: add sqlResolutionPathEntries and sessionScopeUnqualifiedAllowed (reads stored session path) - RelationResolution: walk path entries via PersistentCatalogStep, each entry qualifies the object name under that catalog/namespace - FunctionResolution: use sqlResolutionPathEntriesForAnalysis for resolution candidates; move procedure resolution here - Analyzer: add sessionConf/resolutionConf for isolated analysis; AnalysisContext gains resolutionPathEntries field (initially None) - CheckAnalysis: view-body-aware catalogPathForError, UnresolvedTableOrViewSearchPathMode, CatalogManager constants - VariableResolution: allowUnqualifiedSessionTempVariableLookup gates unqualified variable access when system.session not on PATH - Single-pass resolver updates aligned with new resolution signatures Frozen path analysis for views/SQL functions comes in a follow-up PR. Part of SPARK-54810. --- .../sql/catalyst/analysis/Analyzer.scala | 72 ++++++----- .../sql/catalyst/analysis/CheckAnalysis.scala | 60 +++++---- .../analysis/ColumnResolutionHelper.scala | 3 +- .../analysis/FunctionResolution.scala | 114 ++++++++++++++++-- .../analysis/RelationResolution.scala | 81 +++++++++---- .../catalyst/analysis/ResolveCatalogs.scala | 6 + .../analysis/ResolveFetchCursor.scala | 3 +- .../analysis/ResolveSetVariable.scala | 3 +- .../analysis/VariableResolution.scala | 19 ++- .../analysis/resolver/HybridAnalyzer.scala | 1 + .../analysis/resolver/NameScope.scala | 4 +- .../catalyst/analysis/resolver/Resolver.scala | 11 +- .../analysis/resolver/ResolverGuard.scala | 7 +- .../catalyst/analysis/v2ResolutionPlans.scala | 26 +++- .../sql/catalyst/parser/AstBuilder.scala | 8 +- .../connector/catalog/CatalogManager.scala | 37 ++++++ .../analysis/TableLookupCacheSuite.scala | 9 ++ ...TimezoneAwareExpressionResolverSuite.scala | 5 +- .../explain-results/read_changes.explain | 4 +- .../read_changes_with_options.explain | 4 +- .../explain-results/read_table.explain | 4 +- ...streaming_changes_API_with_options.explain | 4 +- .../streaming_table_API_with_options.explain | 4 +- .../query-tests/explain-results/table.explain | 4 +- .../table_API_with_options.explain | 4 +- .../connect/ProtoToParsedPlanTestSuite.scala | 26 +++- .../apache/spark/sql/classic/Catalog.scala | 6 +- .../spark/sql/execution/SparkSqlParser.scala | 9 +- .../sql/execution/command/SetCommand.scala | 4 +- .../internal/BaseSessionStateBuilder.scala | 2 +- .../sql-tests/results/describe.sql.out | 2 +- .../org/apache/spark/sql/SetPathSuite.scala | 80 ++++++++++-- .../analysis/resolver/NameScopeSuite.scala | 1 + .../command/AlignAssignmentsSuiteBase.scala | 7 ++ .../execution/command/DDLParserSuite.scala | 2 +- .../command/PlanResolutionSuite.scala | 17 +++ 36 files changed, 515 insertions(+), 138 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c09361969a9e4..81232aac909df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import scala.util.{Failure, Random, Success, Try} -import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException} +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.internal.config.ConfigBindingPolicy import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ @@ -139,17 +139,22 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo * even if a temp view `t` has been created. * @param outerPlan The query plan from the outer query that can be used to resolve star * expressions in a subquery. + * @param resolutionPathEntries When resolving a view body, the ordered path for unqualified + * relation names (see [[AnalysisContext.withAnalysisContext]]). + * [[None]] outside views: compute from session + * [[CatalogManager.sqlResolutionPathEntries]]. */ case class AnalysisContext( isDefault: Boolean = false, catalogAndNamespace: Seq[String] = Nil, + resolutionPathEntries: Option[Seq[Seq[String]]] = None, nestedViewDepth: Int = 0, maxNestedViewDepth: Int = -1, relationCache: mutable.Map[(Seq[String], Option[TimeTravelSpec]), LogicalPlan] = mutable.Map.empty, referredTempViewNames: Seq[Seq[String]] = Seq.empty, // 1. If we are resolving a view, this field will be restored from the view metadata, - // by calling `AnalysisContext.withAnalysisContext(viewDesc)`. + // by calling `AnalysisContext.withAnalysisContext(viewDesc, catalogManager)`. // 2. If we are not resolving a view, this field will be updated everytime the analyzer // lookup a temporary function. And export to the view metadata. referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty, @@ -198,7 +203,6 @@ object AnalysisContext { def withAnalysisContext[A](viewDesc: CatalogTable)(f: => A): A = { val originContext = value.get() val maxNestedViewDepth = if (originContext.maxNestedViewDepth == -1) { - // Here we start to resolve views, get `maxNestedViewDepth` from configs. SQLConf.get.maxNestedViewDepth } else { originContext.maxNestedViewDepth @@ -290,13 +294,20 @@ object Analyzer { */ class Analyzer( override val catalogManager: CatalogManager, - private[sql] val sharedRelationCache: RelationCache = RelationCache.empty) + private[sql] val sharedRelationCache: RelationCache = RelationCache.empty, + private[sql] val sessionConf: Option[SQLConf] = None) extends RuleExecutor[LogicalPlan] with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper { + /** Conf to use for path-based resolution and error messages; uses session conf when available. */ + private[sql] def resolutionConf: SQLConf = sessionConf.getOrElse(SQLConf.get) + + private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog - private val relationResolution = new RelationResolution(catalogManager, sharedRelationCache) - private val functionResolution = new FunctionResolution(catalogManager, relationResolution) + private val relationResolution = + new RelationResolution(catalogManager, sharedRelationCache, sessionConf) + private val functionResolution = new FunctionResolution(catalogManager, relationResolution, + resolutionConf) override protected def validatePlanChanges( previousPlan: LogicalPlan, @@ -317,11 +328,16 @@ class Analyzer( if (plan.analyzed) { plan } else { + def runAnalysis(): LogicalPlan = HybridAnalyzer.fromLegacyAnalyzer( + legacyAnalyzer = this, tracker = tracker).apply(plan) if (AnalysisContext.get.isDefault) { AnalysisContext.reset() try { AnalysisHelper.markInAnalyzer { - HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker = tracker).apply(plan) + sessionConf match { + case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() } + case None => runAnalysis() + } } } finally { AnalysisContext.reset() @@ -329,7 +345,10 @@ class Analyzer( } else { AnalysisContext.withNewAnalysisContext { AnalysisHelper.markInAnalyzer { - HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker = tracker).apply(plan) + sessionConf match { + case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() } + case None => runAnalysis() + } } } } @@ -342,7 +361,10 @@ class Analyzer( } } - private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan) + private def executeSameContext(plan: LogicalPlan): LogicalPlan = sessionConf match { + case Some(c) => SQLConf.withExistingConf(c) { super.execute(plan) } + case None => super.execute(plan) + } def resolver: Resolver = conf.resolver @@ -981,7 +1003,8 @@ class Analyzer( // This is done by keeping the catalog and namespace in `AnalysisContext`, and analyzer will // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name. // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names - // with it, instead of current catalog and namespace. + // with it, instead of current catalog and namespace. Unqualified relation PATH will be + // snapshotted in `AnalysisContext.resolutionPathEntries` in a follow-up PR. private def resolveViews( plan: LogicalPlan, options: CaseInsensitiveStringMap): LogicalPlan = plan match { @@ -1090,7 +1113,7 @@ class Analyzer( case other => other }.getOrElse(u) - case u @ UnresolvedTableOrView(identifier, cmd, allowTempView) => + case u @ UnresolvedTableOrView(identifier, cmd, allowTempView, _) => lookupTableOrView(identifier).map { case _: ResolvedTempView if !allowTempView => throw QueryCompilationErrors.expectPermanentViewNotTempViewError( @@ -2078,9 +2101,14 @@ class Analyzer( throw QueryCompilationErrors.notAScalarFunctionError(nameParts.mkString("."), f) case FunctionType.NotFound => - val catalogPath = - catalogManager.currentCatalog.name +: catalogManager.currentNamespace - val searchPath = SQLConf.get.resolutionSearchPath(catalogPath.toSeq) + val catalogPath = { + val ctx = AnalysisContext.get.catalogAndNamespace + if (ctx.nonEmpty) ctx + else (catalogManager.currentCatalog.name +: + catalogManager.currentNamespace).toSeq + } + val searchPath = catalogManager + .sqlResolutionPathEntries(catalogPath.head, catalogPath.tail.toSeq) .map(_.quoted) throw QueryCompilationErrors.unresolvedRoutineError( nameParts, @@ -2278,20 +2306,8 @@ class Analyzer( object ResolveProcedures extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(UNRESOLVED_PROCEDURE), ruleId) { - case UnresolvedProcedure(CatalogAndIdentifier(catalog, ident)) => - val procedureCatalog = catalog.asProcedureCatalog - val procedure = load(procedureCatalog, ident) - ResolvedProcedure(procedureCatalog, ident, procedure) - } - - private def load(catalog: ProcedureCatalog, ident: Identifier): UnboundProcedure = { - try { - catalog.loadProcedure(ident) - } catch { - case e: Exception if !e.isInstanceOf[SparkThrowable] => - val nameParts = catalog.name +: ident.asMultipartIdentifier - throw QueryCompilationErrors.failedToLoadRoutineError(nameParts, e) - } + case u: UnresolvedProcedure => + functionResolution.resolveProcedure(u) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 9d48955cbc71e..a3cc462da0e3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.catalyst.analysis -import java.util.Locale - import scala.collection.mutable import org.apache.spark.{SparkException, SparkThrowable} @@ -45,6 +43,8 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString protected def isView(nameParts: Seq[String]): Boolean + protected def conf: org.apache.spark.sql.internal.SQLConf + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ /** @@ -73,20 +73,31 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString * Contains system.session and the current catalog namespace only. Not from SQLConf. */ private def ddlSearchPathForError(catalogPath: Seq[String]): Seq[String] = { - Seq(toSQLId(Seq("system", "session")), toSQLId(catalogPath)) + val sessionPath = Seq( + CatalogManager.SYSTEM_CATALOG_NAME, + CatalogManager.SESSION_NAMESPACE) + Seq(toSQLId(sessionPath), toSQLId(catalogPath)) } /** - * `SQLConf.resolutionSearchPath` entries formatted with [[toSQLId]] for TABLE_OR_VIEW_NOT_FOUND. - * Same ordering as relation resolution and routine resolution search paths. + * Formats [[CatalogManager.sqlResolutionPathEntries]] with [[toSQLId]] + * for TABLE_OR_VIEW_NOT_FOUND error messages. */ private def fullSearchPathForError(catalogPath: Seq[String]): Seq[String] = { - SQLConf.get.resolutionSearchPath(catalogPath).map(toSQLId) + val catalog = catalogPath.head + val ns = catalogPath.tail.toSeq + catalogManager.sqlResolutionPathEntries(catalog, ns).map(toSQLId) } - /** Current catalog name and namespace as a path, used when computing search path for errors. */ + /** + * Catalog + namespace path for error messages. When resolving inside a view body, + * uses the view's defining catalog/namespace from AnalysisContext so the error + * reflects where the view was trying to resolve. + */ private def catalogPathForError: Seq[String] = { - (currentCatalog.name +: catalogManager.currentNamespace).toSeq + val ctx = AnalysisContext.get.catalogAndNamespace + if (ctx.nonEmpty) ctx + else (currentCatalog.name +: catalogManager.currentNamespace).toSeq } /** @@ -94,13 +105,15 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString * (e.g. DROP TEMPORARY VIEW). Contains system.session only. */ private def tempViewOnlySearchPathForError(): Seq[String] = { - Seq(toSQLId(Seq("system", "session"))) + Seq(toSQLId(Seq( + CatalogManager.SYSTEM_CATALOG_NAME, + CatalogManager.SESSION_NAMESPACE))) } /** * Search path for TABLE_OR_VIEW_NOT_FOUND on unresolved relations in SELECT/DML/INSERT/time * travel. Three-part `system.session.name` resolves only to session temp views, so only that - * scope is listed. Other names use [[fullSearchPathForError]] (resolutionSearchPath order). + * scope is listed. Other names use [[fullSearchPathForError]] (sqlResolutionPathEntries order). */ private def searchPathForUnresolvedRelation(multipartIdentifier: Seq[String]): Seq[String] = { if (CatalogManager.isFullyQualifiedSystemSessionViewName(multipartIdentifier)) { @@ -381,17 +394,17 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString case u: UnresolvedTableOrView => val catalogPath = catalogPathForError - val searchPath = if (u.commandName.toUpperCase(Locale.ROOT).contains("TEMPORARY VIEW")) { - tempViewOnlySearchPathForError() - } else if (u.commandName.toUpperCase(Locale.ROOT).startsWith("DESCRIBE") || - u.commandName.toUpperCase(Locale.ROOT).startsWith("DESC ")) { - if (CatalogManager.isFullyQualifiedSystemSessionViewName(u.multipartIdentifier)) { + val searchPath = u.tableNotFoundSearchPathMode match { + case UnresolvedTableOrViewSearchPathMode.TempViewOnly => tempViewOnlySearchPathForError() - } else { - fullSearchPathForError(catalogPath) - } - } else { - ddlSearchPathForError(catalogPath) + case UnresolvedTableOrViewSearchPathMode.QueryLike => + if (CatalogManager.isFullyQualifiedSystemSessionViewName(u.multipartIdentifier)) { + tempViewOnlySearchPathForError() + } else { + fullSearchPathForError(catalogPath) + } + case UnresolvedTableOrViewSearchPathMode.Ddl => + ddlSearchPathForError(catalogPath) } u.tableNotFound(u.multipartIdentifier, searchPath) @@ -401,8 +414,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString searchPathForUnresolvedRelation(u.multipartIdentifier)) case u: UnresolvedFunctionName => - val searchPath = - SQLConf.get.resolutionSearchPath(catalogPathForError).map(_.quoted) + val searchPath = fullSearchPathForError(catalogPathForError) throw QueryCompilationErrors.unresolvedRoutineError( u.multipartIdentifier, searchPath, @@ -595,7 +607,9 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString searchPathForUnresolvedRelation(u.multipartIdentifier)) case RelationChanges(u: UnresolvedRelation, _) => - u.tableNotFound(u.multipartIdentifier) + u.tableNotFound( + u.multipartIdentifier, + searchPathForUnresolvedRelation(u.multipartIdentifier)) case etw: EventTimeWatermark => etw.eventTime.dataType match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala index 49792bf4d4bc0..66ab4e8fd8575 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala @@ -254,7 +254,8 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { // Resolves `UnresolvedAttribute` to its value. protected def resolveVariables(e: Expression): Expression = { - val variableResolution = new VariableResolution(catalogManager.tempVariableManager) + val variableResolution = + new VariableResolution(catalogManager.tempVariableManager, catalogManager) def resolve(nameParts: Seq[String]): Option[Expression] = { variableResolution.resolveMultipartName( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index 4f3428cc69739..b188ffb7e3737 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.analysis import java.util.concurrent.atomic.AtomicBoolean +import scala.util.control.NonFatal + +import org.apache.spark.SparkThrowable import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier @@ -32,7 +35,8 @@ import org.apache.spark.sql.connector.catalog.{ CatalogPlugin, CatalogV2Util, Identifier, - LookupCatalog + LookupCatalog, + ProcedureCatalog } /** @@ -56,13 +60,13 @@ import org.apache.spark.sql.connector.catalog.functions.{ UnboundFunction } import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types._ class FunctionResolution( override val catalogManager: CatalogManager, - relationResolution: RelationResolution) + relationResolution: RelationResolution, + conf: org.apache.spark.sql.internal.SQLConf) extends DataTypeErrorsBase with LookupCatalog with Logging { private val v1SessionCatalog = catalogManager.v1SessionCatalog @@ -83,23 +87,41 @@ class FunctionResolution( /** * Produces the ordered list of candidate names for resolution. Expansion happens in two cases: * - * 1. Single-part names: expanded via the search path, where each search path entry is - * fully qualified so appending the name produces fully qualified candidates. + * 1. Single-part names: expanded via [[CatalogManager.sqlResolutionPathEntries]] (same list as + * relation resolution), where each path entry is fully qualified so appending the name + * produces fully qualified candidates. * 2. `builtin.name` or `session.name`: prepending `system` creates a fully qualified * system catalog candidate. The original 2-part name is also kept as a persistent * catalog candidate (qualified downstream). Order is controlled by * the `persistentCatalogFirst` config. * * All other multi-part names are returned as-is for downstream resolution. + * + * When [[AnalysisContext.resolutionPathEntries]] is set (view or SQL function / table function + * body with a pinned path, with [[SQLConf.PATH_ENABLED]] true), that frozen list is used + * directly, matching [[RelationResolution.relationResolutionEntries]] so routine order stays + * aligned with relation order. */ + private def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = { + AnalysisContext.get.resolutionPathEntries match { + case Some(entries) if conf.pathEnabled => entries + case _ => + val pathDefault = currentCatalogPath + catalogManager.sqlResolutionPathEntries( + pathDefault.head, + pathDefault.tail.toSeq, + catalogManager.currentCatalog.name, + catalogManager.currentNamespace.toSeq) + } + } + private def resolutionCandidates(nameParts: Seq[String]): Seq[Seq[String]] = { if (nameParts.size == 1) { - val searchPath = SQLConf.get.resolutionSearchPath(currentCatalogPath) - searchPath.map(_ ++ nameParts) + sqlResolutionPathEntriesForAnalysis.map(_ ++ nameParts) } else if (nameParts.size == 2 && FunctionResolution.sessionNamespaceKind(nameParts).isDefined) { val systemCandidate = CatalogManager.SYSTEM_CATALOG_NAME +: nameParts - if (SQLConf.get.prioritizeSystemCatalog) { + if (conf.prioritizeSystemCatalog) { Seq(systemCandidate, nameParts) } else { Seq(nameParts, systemCandidate) @@ -174,9 +196,10 @@ class FunctionResolution( case None => } } - val searchPath = SQLConf.get.resolutionSearchPath(currentCatalogPath) throw QueryCompilationErrors.unresolvedRoutineError( - unresolvedFunc.nameParts, searchPath.map(toSQLId), unresolvedFunc.origin) + unresolvedFunc.nameParts, + sqlResolutionPathEntriesForAnalysis.map(toSQLId), + unresolvedFunc.origin) } } @@ -345,6 +368,25 @@ class FunctionResolution( } // Check external catalog for persistent functions + if (nameParts.length == 1) { + // Must match [[resolutionCandidates]] / [[resolveFunction]]: single-part names use PATH + + // session order, not only the current namespace (LookupCatalog single-part rule). + for (candidate <- resolutionCandidates(nameParts)) { + try { + candidate match { + case CatalogAndIdentifier(catalog, ident) => + if (catalog.asFunctionCatalog.functionExists(ident)) { + return FunctionType.Persistent + } + case _ => + } + } catch { + case NonFatal(_) => + } + } + return FunctionType.NotFound + } + val CatalogAndIdentifier(catalog, ident) = relationResolution.expandIdentifier(nameParts) if (catalog.asFunctionCatalog.functionExists(ident)) { return FunctionType.Persistent @@ -592,6 +634,58 @@ class FunctionResolution( errorClass = errorClass, messageParameters = messageParameters) } + + /** + * Resolves [[UnresolvedProcedure]] for `CALL` / `DESCRIBE PROCEDURE` using the same multipart + * candidates as SQL functions and relations ([[resolutionCandidates]] / + * [[sqlResolutionPathEntriesForAnalysis]]). Catalogs that do not implement + * [[ProcedureCatalog]] are skipped for unqualified names; an explicitly catalog-qualified name + * that targets a non-[[ProcedureCatalog]] still raises + * [[QueryCompilationErrors.missingCatalogProceduresAbilityError]]. + */ + def resolveProcedure(unresolved: UnresolvedProcedure): LogicalPlan = { + val candidates = resolutionCandidates(unresolved.nameParts) + for (multipart <- candidates) { + val expandedOpt = + try { + Some(relationResolution.expandIdentifier(multipart)) + } catch { + case NonFatal(_) => None + } + expandedOpt.foreach { expanded => + CatalogAndIdentifier.unapply(expanded).foreach { case (catalog, ident) => + catalog match { + case pc: ProcedureCatalog => + try { + val procedure = pc.loadProcedure(ident) + return ResolvedProcedure(pc, ident, procedure) + } catch { + case e: AnalysisException => throw e + case e: SparkThrowable => throw e + case NonFatal(e) => + val cause = e match { + case ex: Exception => ex + case th => new RuntimeException(th) + } + throw QueryCompilationErrors.failedToLoadRoutineError( + catalog.name +: ident.asMultipartIdentifier, + cause) + } + case _ => + if (unresolved.nameParts.length > 1 && + catalogManager.isCatalogRegistered(unresolved.nameParts.head) && + catalog.name().equalsIgnoreCase(unresolved.nameParts.head)) { + throw QueryCompilationErrors.missingCatalogProceduresAbilityError(catalog) + } + } + } + } + } + throw QueryCompilationErrors.unresolvedRoutineError( + unresolved.nameParts, + sqlResolutionPathEntriesForAnalysis.map(toSQLId), + unresolved.origin) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index e86248febd2eb..b4ec6a9e153e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -49,7 +49,8 @@ import org.apache.spark.util.ArrayImplicits._ class RelationResolution( override val catalogManager: CatalogManager, - sharedRelationCache: RelationCache) + sharedRelationCache: RelationCache, + sessionConf: Option[SQLConf] = None) extends DataTypeErrorsBase with Logging with LookupCatalog @@ -57,6 +58,9 @@ class RelationResolution( type CacheKey = (Seq[String], Option[TimeTravelSpec]) + /** Matches [[Analyzer.resolutionConf]] when the analyzer passes its session conf through. */ + override def conf: SQLConf = sessionConf.getOrElse(SQLConf.get) + val v1SessionCatalog = catalogManager.v1SessionCatalog private def relationCache: mutable.Map[CacheKey, LogicalPlan] = AnalysisContext.get.relationCache @@ -110,34 +114,61 @@ class RelationResolution( /** * Scope in the relation resolution search path. Used to interpret - * [[SQLConf.resolutionSearchPath]] when resolving unqualified table/view names. + * [[CatalogManager.sqlResolutionPathEntries]] when resolving unqualified table/view names. */ - private sealed trait RelationResolutionScope - private case object SessionScope extends RelationResolutionScope - private case object PersistentScope extends RelationResolutionScope + private sealed trait RelationResolutionStep + private case object SessionScopeStep extends RelationResolutionStep + private case class PersistentCatalogStep(catalogAndNamespace: Seq[String]) + extends RelationResolutionStep + + /** + * Path entries for unqualified relation resolution. + * + * Inside a view, [[AnalysisContext.resolutionPathEntries]] will be + * populated from the frozen path stored in view metadata (follow-up PR). + * When PATH is disabled, legacy resolution rules apply. + */ + private def relationResolutionEntries: Seq[Seq[String]] = { + val pinned = AnalysisContext.get.resolutionPathEntries + if (pinned.isDefined && conf.pathEnabled) { + pinned.get + } else { + val expandCatalog = catalogManager.currentCatalog.name + val expandNamespace = catalogManager.currentNamespace.toSeq + val (pathCatalog, pathNamespace) = + if (isResolvingView) { + val p = AnalysisContext.get.catalogAndNamespace + (p.head, p.tail.toSeq) + } else { + (expandCatalog, expandNamespace) + } + catalogManager.sqlResolutionPathEntries( + pathCatalog, + pathNamespace, + expandCatalog, + expandNamespace) + } + } /** - * Returns the relation resolution search path for unqualified (1-part) names. - * Uses the single search path for all objects: [[SQLConf.resolutionSearchPath]]. - * Maps path entries: system.session -> SessionScope, system.builtin -> skip (no views), - * other (catalog path) -> PersistentScope. + * Ordered resolution steps for unqualified relation names. Each persistent path entry is kept + * with its catalog/namespace so lookup qualifies the object name under that entry (not only + * under the session's current namespace). */ - private def relationResolutionSearchPath: Seq[RelationResolutionScope] = { - val catalogPath = (currentCatalog.name +: catalogManager.currentNamespace).toSeq - conf.resolutionSearchPath(catalogPath).flatMap { - case Seq("system", "session") => Some(SessionScope) + private def relationResolutionSteps: Seq[RelationResolutionStep] = { + relationResolutionEntries.flatMap { + case p if CatalogManager.isSystemSessionPathEntry(p) => Some(SessionScopeStep) case Seq("system", "builtin") => None - case _ => Some(PersistentScope) + case entry => Some(PersistentCatalogStep(entry)) } } /** * Resolution search path formatted for TABLE_OR_VIEW_NOT_FOUND error messages. - * Same order as relationResolutionSearchPath; each entry is quoted (e.g. "`system`.`session`"). + * Same order as [[relationResolutionSteps]]; each entry is quoted (e.g. "`system`.`session`"). */ def resolutionSearchPathForError: Seq[String] = { - val catalogPath = (currentCatalog.name +: catalogManager.currentNamespace).toSeq - conf.resolutionSearchPath(catalogPath).map(toSQLId) + relationResolutionEntries.map(toSQLId) } /** @@ -195,15 +226,15 @@ class RelationResolution( ).orElse(tryResolvePersistent(u, identifier, finalTimeTravelSpec)) } - // 1-part name: try each scope in relationResolutionSearchPath order (from - // [[SQLConf.resolutionSearchPath]]). - val candidates = relationResolutionSearchPath - for (scope <- candidates) { - val result = scope match { - case SessionScope => + // 1-part name: try each step in [[relationResolutionSteps]] order (from + // [[CatalogManager.sqlResolutionPathEntries]]). + val steps = relationResolutionSteps + for (step <- steps) { + val result = step match { + case SessionScopeStep => resolveTempView(identifier, u.isStreaming, finalTimeTravelSpec.isDefined) - case PersistentScope => - tryResolvePersistent(u, identifier, finalTimeTravelSpec) + case PersistentCatalogStep(prefix) => + tryResolvePersistent(u, prefix ++ identifier, finalTimeTravelSpec) } if (result.isDefined) return result } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index f34c6be9954e9..f7319e9b03e84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -79,6 +79,12 @@ class ResolveCatalogs(val catalogManager: CatalogManager) throw new AnalysisException( "UNSUPPORTED_FEATURE.SQL_SCRIPTING_DROP_TEMPORARY_VARIABLE", Map.empty) } + if (nameParts.length == 1 && + !catalogManager.sessionScopeUnqualifiedAllowed( + catalogManager.currentCatalog.name(), + catalogManager.currentNamespace.toSeq)) { + throw QueryCompilationErrors.unresolvedVariableError(nameParts, Seq("SYSTEM", "SESSION")) + } val resolved = catalogManager.tempVariableManager.qualify(nameParts.last) assertValidSessionVariableNameParts(nameParts, resolved) d.copy(name = resolved) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala index 55622637f3046..b47332ace2b85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala @@ -34,7 +34,8 @@ class ResolveFetchCursor(val catalogManager: CatalogManager) extends Rule[Logica with ColumnResolutionHelper { // VariableResolution looks up both scripting local variables (via SqlScriptingContextManager) // and session variables (via tempVariableManager), checking local variables first. - private val variableResolution = new VariableResolution(catalogManager.tempVariableManager) + private val variableResolution = + new VariableResolution(catalogManager.tempVariableManager, catalogManager) /** * Checks for duplicate variable names and throws an exception if found. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala index 4b16448641bc1..6ecbc87d35530 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.types.IntegerType */ class ResolveSetVariable(val catalogManager: CatalogManager) extends Rule[LogicalPlan] with ColumnResolutionHelper { - private val variableResolution = new VariableResolution(catalogManager.tempVariableManager) + private val variableResolution = + new VariableResolution(catalogManager.tempVariableManager, catalogManager) /** * Checks for duplicate variable names and throws an exception if found. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala index 0095885c0135d..f8cce0d6f821e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala @@ -33,7 +33,21 @@ import org.apache.spark.sql.connector.catalog.{ Identifier } -class VariableResolution(tempVariableManager: TempVariableManager) extends SQLConfHelper { +class VariableResolution( + tempVariableManager: TempVariableManager, + catalogManager: CatalogManager) + extends SQLConfHelper { + + /** + * Unqualified session variables resolve only when SYSTEM.SESSION is on the SQL path + * (PATH enabled and explicitly set). + */ + private def allowUnqualifiedSessionTempVariableLookup(nameParts: Seq[String]): Boolean = { + if (nameParts.length != 1) return true + catalogManager.sessionScopeUnqualifiedAllowed( + catalogManager.currentCatalog.name(), + catalogManager.currentNamespace.toSeq) + } /** * Resolves a `multipartName` to an [[Expression]] tree, supporting nested field access. @@ -125,7 +139,8 @@ class VariableResolution(tempVariableManager: TempVariableManager) extends SQLCo ) } .orElse( - if (maybeTempVariableName(nameParts)) { + if (maybeTempVariableName(nameParts) && + allowUnqualifiedSessionTempVariableLookup(nameParts)) { tempVariableManager .get(namePartsCaseAdjusted) .map { varDef => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala index 3fc6438597300..94c3c24cbb1a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala @@ -378,6 +378,7 @@ object HybridAnalyzer { extensions = legacyAnalyzer.singlePassResolverExtensions, metadataResolverExtensions = legacyAnalyzer.singlePassMetadataResolverExtensions, externalRelationResolution = Some(relationResolution), + conf = legacyAnalyzer.resolutionConf, extendedRewriteRules = legacyAnalyzer.singlePassPostHocResolutionRules, tracker = Some(tracker) ), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala index 1c9a296af9ba4..2c39fe71e62ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.expressions.{ } import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.Metadata @@ -974,13 +975,14 @@ class NameScope( */ class NameScopeStack( tempVariableManager: TempVariableManager, + catalogManager: CatalogManager, subqueryRegistry: SubqueryRegistry, planLogger: PlanLogger = new PlanLogger) extends SQLConfHelper { private val stack = new ArrayDeque[NameScope] stack.push(new NameScope(planLogger = planLogger)) - private val variableResolution = new VariableResolution(tempVariableManager) + private val variableResolution = new VariableResolution(tempVariableManager, catalogManager) /** * Get the current scope, which is a default choice for name resolution. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala index 35d752779d6c6..5bb92d1366925 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala @@ -53,8 +53,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.util.EvaluateUnresolvedInlineTable import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.errors.QueryErrorsBase +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} import org.apache.spark.sql.internal.SQLConf /** @@ -86,6 +85,7 @@ class Resolver( override val extensions: Seq[ResolverExtension] = Seq.empty, metadataResolverExtensions: Seq[ResolverExtension] = Seq.empty, externalRelationResolution: Option[RelationResolution] = None, + conf: SQLConf = SQLConf.get, extendedRewriteRules: Seq[Rule[LogicalPlan]] = Seq.empty, tracker: Option[QueryPlanningTracker] = None) extends LogicalPlanResolver @@ -96,6 +96,7 @@ class Resolver( private val subqueryRegistry = new SubqueryRegistry private val scopes = new NameScopeStack( tempVariableManager = catalogManager.tempVariableManager, + catalogManager = catalogManager, subqueryRegistry = subqueryRegistry, planLogger = planLogger ) @@ -105,7 +106,7 @@ class Resolver( private val relationResolution = externalRelationResolution.getOrElse { Resolver.createRelationResolution(catalogManager, sharedRelationCache) } - private val functionResolution = new FunctionResolution(catalogManager, relationResolution) + private val functionResolution = new FunctionResolution(catalogManager, relationResolution, conf) private val expressionResolver = new ExpressionResolver(this, functionResolution, planLogger) private val aggregateResolver = new AggregateResolver(this, expressionResolver) private val expressionIdAssigner = expressionResolver.getExpressionIdAssigner @@ -585,7 +586,9 @@ class Resolver( val multipartId = unresolvedRelation.multipartIdentifier val catalogPath = (catalogManager.currentCatalog.name() +: catalogManager.currentNamespace).toSeq - val searchPath = SQLConf.get.resolutionSearchPath(catalogPath).map(toSQLId) + val searchPath = catalogManager + .sqlResolutionPathEntries(catalogPath.head, catalogPath.tail.toSeq) + .map(toSQLId) unresolvedRelation.tableNotFound(multipartId, searchPath) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala index e2171b84b6eb2..ed41c320f8c9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala @@ -474,9 +474,8 @@ class ResolverGuard( private def checkUnresolvedFunction(unresolvedFunction: UnresolvedFunction) = { val nameParts = unresolvedFunction.nameParts val funcName = nameParts.last.toLowerCase(Locale.ROOT) - - if (nameParts.length == 1) { - // Unqualified: same as master (unsupported, non-builtin, or check children) + if (nameParts.size == 1) { + // Unqualified: reject if unsupported, else non-builtin or check children (same as master) if (isUnsupportedFunction(funcName)) { Some(s"unsupported function ${funcName}") } else if (!isBuiltinFunction(funcName)) { @@ -493,7 +492,7 @@ class ResolverGuard( unresolvedFunction.children.collectFirst { case CheckExpression(reason) => reason } } } else if (FunctionResolution.sessionNamespaceKind(nameParts).isDefined) { - // Session-qualified: allow through (system-first behavior) + // Session-qualified: allow through (PATH + system-first) unresolvedFunction.children.collectFirst { case CheckExpression(reason) => reason } } else { Some("multi-part function name") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index fd05b7cd5a2a7..5bf25d6aa89cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -68,15 +68,39 @@ case class UnresolvedView( allowTemp: Boolean, suggestAlternative: Boolean = false) extends UnresolvedLeafNode +/** + * Controls which search path is shown in `TABLE_OR_VIEW_NOT_FOUND` for + * [[UnresolvedTableOrView]] (see [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]). + */ +sealed trait UnresolvedTableOrViewSearchPathMode + +object UnresolvedTableOrViewSearchPathMode { + /** DDL on catalog objects: `system.session` and current catalog namespace only. */ + case object Ddl extends UnresolvedTableOrViewSearchPathMode + /** + * Like `SELECT` / DML: full `sqlResolutionPathEntries` order; fully qualified + * `system.session.*` names still use the temp-view-only path in errors. + */ + case object QueryLike extends UnresolvedTableOrViewSearchPathMode + /** Commands that only target temp views (e.g. some `DROP TEMPORARY VIEW` paths). */ + case object TempViewOnly extends UnresolvedTableOrViewSearchPathMode +} + /** * Holds the name of a table or view that has yet to be looked up in a catalog. It will * be resolved to [[ResolvedTable]], [[ResolvedPersistentView]] or [[ResolvedTempView]] during * analysis. + * + * @param tableNotFoundSearchPathMode how to format `searchPath` in `TABLE_OR_VIEW_NOT_FOUND`; + * set explicitly at parse / construction time (not inferred + * from [[commandName]]). */ case class UnresolvedTableOrView( multipartIdentifier: Seq[String], commandName: String, - allowTempView: Boolean) extends UnresolvedLeafNode + allowTempView: Boolean, + tableNotFoundSearchPathMode: UnresolvedTableOrViewSearchPathMode = + UnresolvedTableOrViewSearchPathMode.Ddl) extends UnresolvedLeafNode sealed trait PartitionSpec extends LeafExpression with Unevaluable { override def dataType: DataType = throw SparkException.internalError( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a346856c520ad..b85321f1944e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -4286,8 +4286,12 @@ class AstBuilder extends DataTypeAstBuilder protected def createUnresolvedTableOrView( ctx: IdentifierReferenceContext, commandName: String, - allowTempView: Boolean = true): LogicalPlan = withOrigin(ctx) { - withIdentClause(ctx, UnresolvedTableOrView(_, commandName, allowTempView)) + allowTempView: Boolean = true, + tableNotFoundSearchPathMode: UnresolvedTableOrViewSearchPathMode = + UnresolvedTableOrViewSearchPathMode.Ddl): LogicalPlan = withOrigin(ctx) { + withIdentClause( + ctx, + UnresolvedTableOrView(_, commandName, allowTempView, tableNotFoundSearchPathMode)) } private def createUnresolvedTableOrView( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 7df836cea6124..3f5afd9ce0de7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -164,6 +164,43 @@ class CatalogManager( } } + /** + * Ordered catalog/schema path entries for resolving unqualified SQL object names. + * When PATH is off or unset, applies [[SQLConf.defaultPathOrder]] (legacy). + * When PATH is explicitly set, uses the resolved stored path entries. + */ + def sqlResolutionPathEntries( + pathDefaultCatalog: String, + pathDefaultNamespace: Seq[String], + expandCatalog: String, + expandNamespace: Seq[String]): Seq[Seq[String]] = synchronized { + val defaultEntry = + if (pathDefaultNamespace.isEmpty) Seq(pathDefaultCatalog) + else pathDefaultCatalog +: pathDefaultNamespace + val stored = if (conf.pathEnabled) _sessionPath else None + stored match { + case Some(entries) => + CatalogManager.resolvePathEntries(entries, expandCatalog, expandNamespace) + case None => + conf.defaultPathOrder(Seq(defaultEntry)) + } + } + + /** Session-catalog overload. */ + def sqlResolutionPathEntries( + currentCatalog: String, + currentNamespace: Seq[String]): Seq[Seq[String]] = + sqlResolutionPathEntries( + currentCatalog, currentNamespace, + currentCatalog, currentNamespace) + + /** True if [[sqlResolutionPathEntries]] includes `system.session`. */ + def sessionScopeUnqualifiedAllowed( + currentCatalog: String, + currentNamespace: Seq[String]): Boolean = + sqlResolutionPathEntries(currentCatalog, currentNamespace) + .exists(CatalogManager.isSystemSessionPathEntry) + private var _currentCatalogName: Option[String] = None def currentCatalog: CatalogPlugin = synchronized { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala index 9685ed5c6d256..63d5523be072d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryTable, InMemoryTableCatalog, Table} import org.apache.spark.sql.connector.catalog.TableWritePrivilege import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class TableLookupCacheSuite extends AnalysisTest with Matchers { @@ -74,6 +75,14 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers { when(catalogManager.v1SessionCatalog).thenReturn(v1Catalog) when(catalogManager.currentCatalog).thenReturn(v2Catalog) when(catalogManager.currentNamespace).thenReturn(Array("default")) + when(catalogManager.sessionPathEntries).thenReturn(None) + val defaultPath = SQLConf.get.resolutionSearchPath( + (v2Catalog.name() +: Array("default")).toSeq) + when(catalogManager.sqlResolutionPathEntries( + any[String], any[Seq[String]], any[String], any[Seq[String]])) + .thenReturn(defaultPath) + when(catalogManager.sqlResolutionPathEntries(any[String], any[Seq[String]])) + .thenReturn(defaultPath) new Analyzer(catalogManager) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolverSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolverSuite.scala index 8897d65654540..1f4b3dea81b31 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolverSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolverSuite.scala @@ -37,7 +37,10 @@ class TimezoneAwareExpressionResolverSuite extends SparkFunSuite { extends ExpressionResolver( resolver = new Resolver(catalogManager), functionResolution = - new FunctionResolution(catalogManager, Resolver.createRelationResolution(catalogManager)), + new FunctionResolution( + catalogManager, + Resolver.createRelationResolution(catalogManager), + org.apache.spark.sql.internal.SQLConf.get), planLogger = new PlanLogger ) { override def resolve(expression: Expression): Expression = resolvedExpression diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/read_changes.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/read_changes.explain index 413eaf8f7a686..9de2e887e1ee6 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/read_changes.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/read_changes.explain @@ -1,2 +1,2 @@ -SubqueryAlias primary.tempdb.myTable -+- RelationV2[id#0L, _change_type#0, _commit_version#0L, _commit_timestamp#0] primary.tempdb.myTable +SubqueryAlias spark_catalog.tempdb.myTable ++- RelationV2[id#0L, _change_type#0, _commit_version#0L, _commit_timestamp#0] spark_catalog.tempdb.myTable diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/read_changes_with_options.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/read_changes_with_options.explain index 413eaf8f7a686..9de2e887e1ee6 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/read_changes_with_options.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/read_changes_with_options.explain @@ -1,2 +1,2 @@ -SubqueryAlias primary.tempdb.myTable -+- RelationV2[id#0L, _change_type#0, _commit_version#0L, _commit_timestamp#0] primary.tempdb.myTable +SubqueryAlias spark_catalog.tempdb.myTable ++- RelationV2[id#0L, _change_type#0, _commit_version#0L, _commit_timestamp#0] spark_catalog.tempdb.myTable diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/read_table.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/read_table.explain index 979084f06a87a..e5dce0fe05742 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/read_table.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/read_table.explain @@ -1,2 +1,2 @@ -SubqueryAlias primary.tempdb.myTable -+- RelationV2[id#0L] primary.tempdb.myTable +SubqueryAlias spark_catalog.tempdb.myTable ++- RelationV2[id#0L] spark_catalog.tempdb.myTable diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain index 6f12567607ac0..a0316c9dba3a3 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain @@ -1,2 +1,2 @@ -~SubqueryAlias primary.tempdb.myStreamingTable -+- ~StreamingRelationV2 primary.tempdb.myStreamingTable_changelog, ChangelogTable(org.apache.spark.sql.connector.catalog.InMemoryChangelog,ChangelogInfo{range=VersionRange[startingVersion=1, endingVersion=Optional.empty, startingBoundInclusive=true, endingBoundInclusive=true], deduplicationMode=DROP_CARRYOVERS, computeUpdates=false}), [startingVersion=1, deduplicationMode=dropCarryovers], [id#0L, _change_type#0, _commit_version#0L, _commit_timestamp#0], org.apache.spark.sql.connector.catalog.InMemoryChangelogCatalog, tempdb.myStreamingTable, name= +~SubqueryAlias spark_catalog.tempdb.myStreamingTable ++- ~StreamingRelationV2 spark_catalog.tempdb.myStreamingTable_changelog, ChangelogTable(org.apache.spark.sql.connector.catalog.InMemoryChangelog,ChangelogInfo{range=VersionRange[startingVersion=1, endingVersion=Optional.empty, startingBoundInclusive=true, endingBoundInclusive=true], deduplicationMode=DROP_CARRYOVERS, computeUpdates=false}), [startingVersion=1, deduplicationMode=dropCarryovers], [id#0L, _change_type#0, _commit_version#0L, _commit_timestamp#0], org.apache.spark.sql.connector.catalog.InMemoryChangelogCatalog, tempdb.myStreamingTable, name= diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain index 9ea4ad218a5f4..dc17d3503894d 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain @@ -1,2 +1,2 @@ -~SubqueryAlias primary.tempdb.myStreamingTable -+- ~StreamingRelationV2 primary.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryChangelogCatalog, tempdb.myStreamingTable, name= +~SubqueryAlias spark_catalog.tempdb.myStreamingTable ++- ~StreamingRelationV2 spark_catalog.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryChangelogCatalog, tempdb.myStreamingTable, name= diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/table.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/table.explain index 979084f06a87a..e5dce0fe05742 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/table.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/table.explain @@ -1,2 +1,2 @@ -SubqueryAlias primary.tempdb.myTable -+- RelationV2[id#0L] primary.tempdb.myTable +SubqueryAlias spark_catalog.tempdb.myTable ++- RelationV2[id#0L] spark_catalog.tempdb.myTable diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/table_API_with_options.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/table_API_with_options.explain index 979084f06a87a..e5dce0fe05742 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/table_API_with_options.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/table_API_with_options.explain @@ -1,2 +1,2 @@ -SubqueryAlias primary.tempdb.myTable -+- RelationV2[id#0L] primary.tempdb.myTable +SubqueryAlias spark_catalog.tempdb.myTable ++- RelationV2[id#0L] spark_catalog.tempdb.myTable diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala index 506a75ca3b4c3..fabbf3071c4ec 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.connect.proto import org.apache.spark.internal.LogKeys.PATH import org.apache.spark.sql.catalyst.{catalog, QueryPlanningTracker} -import org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer, FunctionRegistry, Resolver, TableFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer, FunctionRegistry, RelationCache, Resolver, TableFunctionRegistry} import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.{ReplaceExpressions, RewriteWithExpression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -133,9 +133,25 @@ class ProtoToParsedPlanTestSuite protected val goldenFilePath: Path = suiteBaseResourcePath.resolve("explain-results") private val emptyProps: util.Map[String, String] = util.Collections.emptyMap() + /** + * Isolated from [[SharedSparkSession]] so PATH / session path settings do not affect catalog. + */ + private val analyzerIsolationConf: SQLConf = { + val c = new SQLConf() + c.setConf(SQLConf.PATH_ENABLED, false) + // Match [[sparkConf]]: a bare SQLConf defaults ANSI_ENABLED to true, which changes + // function signatures in analyzed plans (e.g. make_date) vs golden files. + c.setConf(SQLConf.ANSI_ENABLED, false) + c + } + private val analyzer = { val inMemoryCatalog = new InMemoryChangelogCatalog - inMemoryCatalog.initialize("primary", CaseInsensitiveStringMap.empty()) + // Name must match [[CatalogManager.SESSION_CATALOG_NAME]]: path entries use + // [[currentCatalog.name()]], then resolution calls [[catalogManager.catalog]] on that segment. + inMemoryCatalog.initialize( + CatalogManager.SESSION_CATALOG_NAME, + CaseInsensitiveStringMap.empty()) inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps) inMemoryCatalog.createTable( Identifier.of(Array("tempdb"), "myTable"), @@ -154,10 +170,12 @@ class ProtoToParsedPlanTestSuite new catalog.InMemoryCatalog(), FunctionRegistry.builtin, TableFunctionRegistry.builtin)) - catalogManager.setCurrentCatalog("primary") + // Do not call setCurrentCatalog("primary"): that loads a separate plugin via + // Catalogs.load("primary", conf) instead of using defaultSessionCatalog (inMemoryCatalog). + // Leave current catalog as default spark_catalog so v2SessionCatalog returns inMemoryCatalog. catalogManager.setCurrentNamespace(Array("tempdb")) - new Analyzer(catalogManager) { + new Analyzer(catalogManager, RelationCache.empty, Some(analyzerIsolationConf)) { override def resolver: Resolver = caseSensitiveResolution } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala index 35041feca9e18..9a5aed333a4c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala @@ -409,10 +409,12 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog with Logging { val catalogPath = (Seq(currentCatalog()) ++ sparkSession.sessionState.catalogManager.currentNamespace).toSeq - val searchPath = sparkSession.sessionState.conf.resolutionSearchPath(catalogPath) + val searchPath = sparkSession.sessionState.catalogManager + .sqlResolutionPathEntries(catalogPath.head, catalogPath.tail.toSeq) + .map(_.quoted) throw QueryCompilationErrors.unresolvedRoutineError( ident, - searchPath.map(_.quoted), + searchPath, plan.origin) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index b910a3dd6d8aa..8f4c77840f0cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, GlobalTempView, LocalTempView, PersistedView, PlanWithUnresolvedIdentifier, SchemaEvolution, SchemaTypeEvolution, UnresolvedAttribute, - UnresolvedIdentifier, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedProcedure} + UnresolvedIdentifier, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedProcedure, + UnresolvedTableOrViewSearchPathMode} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.parser._ @@ -1450,7 +1451,11 @@ class SparkSqlAstBuilder extends AstBuilder { val tableName = ctx.identifierReference.getText.split("\\.").lastOption.getOrElse("table") throw QueryCompilationErrors.describeJsonNotExtendedError(tableName) } - val relation = createUnresolvedTableOrView(ctx.identifierReference, "DESCRIBE TABLE") + val relation = createUnresolvedTableOrView( + ctx.identifierReference, + "DESCRIBE TABLE", + allowTempView = true, + UnresolvedTableOrViewSearchPathMode.QueryLike) if (ctx.describeColName != null) { if (ctx.partitionSpec != null) { throw QueryParsingErrors.descColumnForPartitionUnsupportedError(ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index e248f0eea96de..4a9bebe75cff1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -108,8 +108,8 @@ case class SetCommand(kv: Option[(String, Option[String])]) } if (varName.nonEmpty && varName.length <= 3) { val variableResolution = new VariableResolution( - sparkSession.sessionState.analyzer.catalogManager.tempVariableManager - ) + sparkSession.sessionState.analyzer.catalogManager.tempVariableManager, + sparkSession.sessionState.analyzer.catalogManager) val variable = variableResolution.lookupVariable( nameParts = varName ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 9bd68cbe72a07..9bbdbc6b22249 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -189,7 +189,7 @@ abstract class BaseSessionStateBuilder( * * Note: this depends on the `conf` and `catalog` fields. */ - protected def analyzer: Analyzer = new Analyzer(catalogManager, sharedRelationCache) { + protected def analyzer: Analyzer = new Analyzer(catalogManager, sharedRelationCache, Some(conf)) { override val hintResolutionRules: Seq[Rule[LogicalPlan]] = customHintResolutionRules diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out b/sql/core/src/test/resources/sql-tests/results/describe.sql.out index 36985a0ec628a..3d5db15a48113 100644 --- a/sql/core/src/test/resources/sql-tests/results/describe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/describe.sql.out @@ -707,7 +707,7 @@ struct -- !query output == Parsed Logical Plan == 'DescribeRelation false, [col_name#x, data_type#x, comment#x] -+- 'UnresolvedTableOrView [t], DESCRIBE TABLE, true ++- 'UnresolvedTableOrView [t], DESCRIBE TABLE, true, QueryLike == Analyzed Logical Plan == col_name: string, data_type: string, comment: string diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index d62827dea2371..ec5151c77858c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -320,18 +320,22 @@ class SetPathSuite extends QueryTest with SharedSparkSession { test("PATH enabled: multi-level namespace (3+ parts) is accepted") { withPathEnabled { - sql("SET PATH = iceberg_cat.db1.db2, spark_catalog.default") - val entries = pathEntries(currentPath()) - assert(entries.head === "iceberg_cat.db1.db2", - s"Multi-level namespace should be accepted; got: $entries") + // SET PATH should accept multi-level namespaces without error. + // We verify the path is stored correctly via the CatalogManager API + // rather than currentPath(), which would fail because spark_catalog + // only supports single-part namespaces. + sql("SET PATH = spark_catalog.ns1.ns2, spark_catalog.default") + val stored = spark.sessionState.catalogManager.sessionPathEntries + assert(stored.isDefined, "Session path should be stored") + assert(stored.get.length == 2, s"Should have 2 entries, got: ${stored.get}") } } test("PATH enabled: backtick-quoted identifiers with dots round-trip correctly") { withPathEnabled { - sql("SET PATH = `cat.a`.`sch.b`") + sql("SET PATH = spark_catalog.`sch.b`, system.builtin") val entries = pathEntries(currentPath()) - assert(entries === Seq("`cat.a`.`sch.b`"), + assert(entries.head === "spark_catalog.`sch.b`", s"Backtick-quoted identifiers should round-trip; got: $entries") } } @@ -382,6 +386,68 @@ class SetPathSuite extends QueryTest with SharedSparkSession { // TODO: cloneSession() constructs a new CatalogManager per forked session and // explicitly copies only the stored session path via copySessionPathFrom. // Other CatalogManager state propagation (current catalog/namespace, registered - // catalogs) on clone is currently incidental — audit and pin down the intended + // catalogs) on clone is currently incidental -- audit and pin down the intended // semantics in a follow-up. + + // --- Resolution tests: verify SET PATH affects actual table/function lookup --- + + test("PATH enabled: table resolves from first matching path entry") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_res_a") + sql("CREATE SCHEMA IF NOT EXISTS path_res_b") + sql("CREATE TABLE path_res_a.tbl (x INT) USING parquet") + sql("CREATE TABLE path_res_b.tbl (x INT) USING parquet") + sql("INSERT INTO path_res_a.tbl VALUES (1)") + sql("INSERT INTO path_res_b.tbl VALUES (2)") + try { + sql("SET PATH = spark_catalog.path_res_a, spark_catalog.path_res_b, system.builtin") + checkAnswer(sql("SELECT x FROM tbl"), Row(1)) + sql("SET PATH = spark_catalog.path_res_b, spark_catalog.path_res_a, system.builtin") + checkAnswer(sql("SELECT x FROM tbl"), Row(2)) + } finally { + sql("DROP TABLE IF EXISTS path_res_a.tbl") + sql("DROP TABLE IF EXISTS path_res_b.tbl") + sql("DROP SCHEMA IF EXISTS path_res_a") + sql("DROP SCHEMA IF EXISTS path_res_b") + } + } + } + + test("PATH enabled: function resolves from first matching path entry") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_fn_a") + sql("CREATE SCHEMA IF NOT EXISTS path_fn_b") + sql("CREATE FUNCTION path_fn_a.pick() RETURNS INT RETURN 1") + sql("CREATE FUNCTION path_fn_b.pick() RETURNS INT RETURN 2") + try { + sql("SET PATH = spark_catalog.path_fn_a, spark_catalog.path_fn_b, system.builtin") + checkAnswer(sql("SELECT pick()"), Row(1)) + sql("SET PATH = spark_catalog.path_fn_b, spark_catalog.path_fn_a, system.builtin") + checkAnswer(sql("SELECT pick()"), Row(2)) + } finally { + sql("DROP FUNCTION IF EXISTS path_fn_a.pick") + sql("DROP FUNCTION IF EXISTS path_fn_b.pick") + sql("DROP SCHEMA IF EXISTS path_fn_a") + sql("DROP SCHEMA IF EXISTS path_fn_b") + } + } + } + + test("PATH enabled: unqualified table fails when schema not in path") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_miss") + sql("CREATE TABLE path_miss.hidden (x INT) USING parquet") + try { + sql("SET PATH = spark_catalog.default, system.builtin") + val err = intercept[AnalysisException] { + sql("SELECT * FROM hidden") + } + assert(err.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND"), + s"Expected TABLE_OR_VIEW_NOT_FOUND, got: ${err.getMessage}") + } finally { + sql("DROP TABLE IF EXISTS path_miss.hidden") + sql("DROP SCHEMA IF EXISTS path_miss") + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala index 257417c6b16b7..3161896e9ed5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala @@ -1220,6 +1220,7 @@ class NameScopeSuite extends QueryTest with SharedSparkSession { private def newNameScopeStack() = new NameScopeStack( tempVariableManager = spark.sessionState.analyzer.catalogManager.tempVariableManager, + catalogManager = spark.sessionState.analyzer.catalogManager, subqueryRegistry = new SubqueryRegistry ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala index 14cf72c78dbee..1a6dc178b6e51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala @@ -188,6 +188,13 @@ abstract class AlignAssignmentsSuiteBase extends AnalysisTest { when(manager.v1SessionCatalog).thenReturn(v1SessionCatalog) when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog) when(manager.tempVariableManager).thenReturn(tempVariableManager) + when(manager.sessionPathEntries).thenReturn(None) + val defaultPath = SQLConf.get.resolutionSearchPath(Seq(v2Catalog.name())) + when(manager.sqlResolutionPathEntries( + any[String], any[Seq[String]], any[String], any[Seq[String]])) + .thenReturn(defaultPath) + when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]])) + .thenReturn(defaultPath) manager } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index d9b91946ded77..9997b6c7bb385 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -731,7 +731,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { parser.parsePlan(sql).collect { case CreateTableLike( UnresolvedIdentifier(targetParts, _), - UnresolvedTableOrView(sourceParts, _, _), + UnresolvedTableOrView(sourceParts, _, _, _), loc, p, _, pr, e) => (targetParts, sourceParts, loc, p, pr, e) }.head diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 89fb6eca223ee..9fc5111427303 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -221,6 +221,13 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { when(manager.currentNamespace).thenReturn(Array.empty[String]) when(manager.v1SessionCatalog).thenReturn(v1SessionCatalog) when(manager.tempVariableManager).thenReturn(tempVariableManager) + when(manager.sessionPathEntries).thenReturn(None) + val defaultPath = SQLConf.get.resolutionSearchPath(Seq(testCat.name())) + when(manager.sqlResolutionPathEntries( + any[String], any[Seq[String]], any[String], any[Seq[String]])) + .thenReturn(defaultPath) + when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]])) + .thenReturn(defaultPath) manager } @@ -230,6 +237,8 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { invocation.getArguments()(0).asInstanceOf[String] match { case "testcat" => testCat + case CatalogManager.SESSION_CATALOG_NAME => + v2SessionCatalog case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) @@ -237,6 +246,14 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { when(manager.currentNamespace).thenReturn(Array("default")) when(manager.v1SessionCatalog).thenReturn(v1SessionCatalog) when(manager.tempVariableManager).thenReturn(tempVariableManager) + when(manager.sessionPathEntries).thenReturn(None) + val defaultPath2 = SQLConf.get.resolutionSearchPath( + (v2SessionCatalog.name() +: Array("default")).toSeq) + when(manager.sqlResolutionPathEntries( + any[String], any[Seq[String]], any[String], any[Seq[String]])) + .thenReturn(defaultPath2) + when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]])) + .thenReturn(defaultPath2) manager } From 99415fd4d5e6a851529a60395d1e245acfeaa402 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Sun, 26 Apr 2026 19:25:22 -0700 Subject: [PATCH 2/9] [SPARK-56605][SQL] Address review follow-ups in PATH wiring Align procedure PATH iteration with table/function behavior, remove dead search-path mode, and clean up analyzer path/conf wiring to match the intended PR4 design. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 14 +++++--------- .../sql/catalyst/analysis/CheckAnalysis.scala | 4 +--- .../sql/catalyst/analysis/FunctionResolution.scala | 8 ++++++++ .../sql/catalyst/analysis/RelationResolution.scala | 2 ++ .../analysis/resolver/HybridAnalyzer.scala | 1 - .../sql/catalyst/analysis/v2ResolutionPlans.scala | 4 +--- .../spark/sql/connector/ProcedureSuite.scala | 12 ++++++++++++ 7 files changed, 29 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 81232aac909df..a41558d101109 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -140,8 +140,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo * @param outerPlan The query plan from the outer query that can be used to resolve star * expressions in a subquery. * @param resolutionPathEntries When resolving a view body, the ordered path for unqualified - * relation names (see [[AnalysisContext.withAnalysisContext]]). - * [[None]] outside views: compute from session + * relation names. Stays [[None]] in this PR; population from the + * frozen path stored in view metadata is wired in a follow-up. + * Outside views: compute from session * [[CatalogManager.sqlResolutionPathEntries]]. */ case class AnalysisContext( @@ -154,7 +155,7 @@ case class AnalysisContext( mutable.Map.empty, referredTempViewNames: Seq[Seq[String]] = Seq.empty, // 1. If we are resolving a view, this field will be restored from the view metadata, - // by calling `AnalysisContext.withAnalysisContext(viewDesc, catalogManager)`. + // by calling `AnalysisContext.withAnalysisContext(viewDesc)`. // 2. If we are not resolving a view, this field will be updated everytime the analyzer // lookup a temporary function. And export to the view metadata. referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty, @@ -2101,12 +2102,7 @@ class Analyzer( throw QueryCompilationErrors.notAScalarFunctionError(nameParts.mkString("."), f) case FunctionType.NotFound => - val catalogPath = { - val ctx = AnalysisContext.get.catalogAndNamespace - if (ctx.nonEmpty) ctx - else (catalogManager.currentCatalog.name +: - catalogManager.currentNamespace).toSeq - } + val catalogPath = catalogPathForError val searchPath = catalogManager .sqlResolutionPathEntries(catalogPath.head, catalogPath.tail.toSeq) .map(_.quoted) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index a3cc462da0e3d..b923d442e6d98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -94,7 +94,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString * uses the view's defining catalog/namespace from AnalysisContext so the error * reflects where the view was trying to resolve. */ - private def catalogPathForError: Seq[String] = { + protected final def catalogPathForError: Seq[String] = { val ctx = AnalysisContext.get.catalogAndNamespace if (ctx.nonEmpty) ctx else (currentCatalog.name +: catalogManager.currentNamespace).toSeq @@ -395,8 +395,6 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString case u: UnresolvedTableOrView => val catalogPath = catalogPathForError val searchPath = u.tableNotFoundSearchPathMode match { - case UnresolvedTableOrViewSearchPathMode.TempViewOnly => - tempViewOnlySearchPathForError() case UnresolvedTableOrViewSearchPathMode.QueryLike => if (CatalogManager.isFullyQualifiedSystemSessionViewName(u.multipartIdentifier)) { tempViewOnlySearchPathForError() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index b188ffb7e3737..e0272e219df04 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -645,6 +645,7 @@ class FunctionResolution( */ def resolveProcedure(unresolved: UnresolvedProcedure): LogicalPlan = { val candidates = resolutionCandidates(unresolved.nameParts) + val skipCandidateFailures = unresolved.nameParts.length == 1 for (multipart <- candidates) { val expandedOpt = try { @@ -660,6 +661,13 @@ class FunctionResolution( val procedure = pc.loadProcedure(ident) return ResolvedProcedure(pc, ident, procedure) } catch { + // ProcedureCatalog has no standard "not found" exception type today. For + // unqualified names searched through PATH, treat candidate failures as misses and + // continue to the next entry (matching table/function PATH iteration semantics). + // Explicitly catalog-qualified names still preserve existing error behavior. + case _: AnalysisException if skipCandidateFailures => + case _: SparkThrowable if skipCandidateFailures => + case NonFatal(_) if skipCandidateFailures => case e: AnalysisException => throw e case e: SparkThrowable => throw e case NonFatal(e) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index b4ec6a9e153e4..64773da917c96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -133,6 +133,8 @@ class RelationResolution( if (pinned.isDefined && conf.pathEnabled) { pinned.get } else { + // Keep expanding CurrentSchemaEntry using the live session catalog/namespace until the + // follow-up PR wires frozen resolutionPathEntries for view analysis. val expandCatalog = catalogManager.currentCatalog.name val expandNamespace = catalogManager.currentNamespace.toSeq val (pathCatalog, pathNamespace) = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala index 94c3c24cbb1a8..3fc6438597300 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala @@ -378,7 +378,6 @@ object HybridAnalyzer { extensions = legacyAnalyzer.singlePassResolverExtensions, metadataResolverExtensions = legacyAnalyzer.singlePassMetadataResolverExtensions, externalRelationResolution = Some(relationResolution), - conf = legacyAnalyzer.resolutionConf, extendedRewriteRules = legacyAnalyzer.singlePassPostHocResolutionRules, tracker = Some(tracker) ), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 5bf25d6aa89cb..046acd20a9e3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -79,11 +79,9 @@ object UnresolvedTableOrViewSearchPathMode { case object Ddl extends UnresolvedTableOrViewSearchPathMode /** * Like `SELECT` / DML: full `sqlResolutionPathEntries` order; fully qualified - * `system.session.*` names still use the temp-view-only path in errors. + * `system.session.*` names still use the session-only path in errors. */ case object QueryLike extends UnresolvedTableOrViewSearchPathMode - /** Commands that only target temp views (e.g. some `DROP TEMPORARY VIEW` paths). */ - case object TempViewOnly extends UnresolvedTableOrViewSearchPathMode } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ProcedureSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ProcedureSuite.scala index dc35bd75b5695..82e3f4f6537da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ProcedureSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ProcedureSuite.scala @@ -222,6 +222,18 @@ class ProcedureSuite extends QueryTest with SharedSparkSession with BeforeAndAft } } + test("PATH enabled: unqualified CALL skips missing candidate and keeps searching") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + try { + catalog("cat2").createProcedure(Identifier.of(Array("ns_hit"), "sum"), UnboundLongSum) + sql("SET PATH = cat.ns_miss, cat2.ns_hit") + checkAnswer(sql("CALL sum(1, 2)"), Row(3L) :: Nil) + } finally { + sql("SET PATH = DEFAULT_PATH") + } + } + } + test("required parameter not found") { catalog.createProcedure(Identifier.of(Array("ns"), "sum"), UnboundSum) checkError( From 4b1449f3dbd9aad7f852587b99f79138bd62586b Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Mon, 27 Apr 2026 05:21:19 -0700 Subject: [PATCH 3/9] [SPARK-56605][SQL] Preserve nested SQLConf during analyzer isolation Avoid overriding explicit nested SQLConf scopes while running with analyzer sessionConf so persisted view/UDF settings (including ANSI mode) remain effective during analysis. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a41558d101109..b676160356ef9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -363,6 +363,9 @@ class Analyzer( } private def executeSameContext(plan: LogicalPlan): LogicalPlan = sessionConf match { + // Respect explicit nested SQLConf overrides (e.g. persisted SQL UDF/view configs). + // Otherwise, run analysis with the captured session conf for analyzer isolation. + case Some(c) if SQLConf.get ne c => super.execute(plan) case Some(c) => SQLConf.withExistingConf(c) { super.execute(plan) } case None => super.execute(plan) } From 753ed80f02954a71de3421caaf8e554909e1ea67 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Mon, 27 Apr 2026 07:07:09 -0700 Subject: [PATCH 4/9] [SPARK-56605][SQL] Trigger CI rerun Create an empty commit to retrigger GitHub Actions checks. From a5838c43d37972b82d7c09f6c545d9de8b2e320d Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Mon, 27 Apr 2026 09:42:19 -0700 Subject: [PATCH 5/9] [SPARK-56605][SQL] Update DESCRIBE parser expectations for search-path mode Adjust DescribeTableParserSuite to expect QueryLike mode for DESCRIBE TABLE unresolved relations and keep parser plan assertions aligned with current parser behavior. --- .../command/DescribeTableParserSuite.scala | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableParserSuite.scala index 436fa2e2389aa..ebe8eaf91d56a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableParserSuite.scala @@ -19,33 +19,40 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, - UnresolvedPartitionSpec, UnresolvedTableOrView} + UnresolvedPartitionSpec, UnresolvedTableOrView, UnresolvedTableOrViewSearchPathMode} import org.apache.spark.sql.catalyst.plans.logical.{DescribeColumn, DescribeRelation, DescribeTablePartition} import org.apache.spark.sql.test.SharedSparkSession class DescribeTableParserSuite extends SharedSparkSession with AnalysisTest { private def parsePlan(statement: String) = spark.sessionState.sqlParser.parsePlan(statement) + private def unresolvedDescribeTable(name: String): UnresolvedTableOrView = { + UnresolvedTableOrView( + Seq(name), + "DESCRIBE TABLE", + allowTempView = true, + UnresolvedTableOrViewSearchPathMode.QueryLike) + } test("SPARK-17328: Fix NPE with EXPLAIN DESCRIBE TABLE") { comparePlans(parsePlan("describe t"), DescribeRelation( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), isExtended = false)) + unresolvedDescribeTable("t"), isExtended = false)) comparePlans(parsePlan("describe table t"), DescribeRelation( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), isExtended = false)) + unresolvedDescribeTable("t"), isExtended = false)) comparePlans(parsePlan("describe table extended t"), DescribeRelation( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), isExtended = true)) + unresolvedDescribeTable("t"), isExtended = true)) comparePlans(parsePlan("describe table formatted t"), DescribeRelation( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), isExtended = true)) + unresolvedDescribeTable("t"), isExtended = true)) } test("describe table with partition spec") { comparePlans(parsePlan("DESCRIBE TABLE t PARTITION (ds='2024-01-01')"), DescribeTablePartition( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), + unresolvedDescribeTable("t"), UnresolvedPartitionSpec(Map("ds" -> "2024-01-01")), isExtended = false)) } @@ -53,38 +60,38 @@ class DescribeTableParserSuite extends SharedSparkSession with AnalysisTest { test("describe table column") { comparePlans(parsePlan("DESCRIBE t col"), DescribeColumn( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), + unresolvedDescribeTable("t"), UnresolvedAttribute(Seq("col")), isExtended = false)) comparePlans(parsePlan("DESCRIBE t `abc.xyz`"), DescribeColumn( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), + unresolvedDescribeTable("t"), UnresolvedAttribute(Seq("abc.xyz")), isExtended = false)) comparePlans(parsePlan("DESCRIBE t abc.xyz"), DescribeColumn( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), + unresolvedDescribeTable("t"), UnresolvedAttribute(Seq("abc", "xyz")), isExtended = false)) comparePlans(parsePlan("DESCRIBE t `a.b`.`x.y`"), DescribeColumn( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), + unresolvedDescribeTable("t"), UnresolvedAttribute(Seq("a.b", "x.y")), isExtended = false)) comparePlans(parsePlan("DESCRIBE TABLE t col"), DescribeColumn( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), + unresolvedDescribeTable("t"), UnresolvedAttribute(Seq("col")), isExtended = false)) comparePlans(parsePlan("DESCRIBE TABLE EXTENDED t col"), DescribeColumn( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), + unresolvedDescribeTable("t"), UnresolvedAttribute(Seq("col")), isExtended = true)) comparePlans(parsePlan("DESCRIBE TABLE FORMATTED t col"), DescribeColumn( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), + unresolvedDescribeTable("t"), UnresolvedAttribute(Seq("col")), isExtended = true)) From 94817ffc46b6b10a493ef64f8f3758117908fc7a Mon Sep 17 00:00:00 2001 From: srielau Date: Mon, 27 Apr 2026 17:34:37 +0000 Subject: [PATCH 6/9] Trigger CI From fce8ca40844cd5e46d98fc88ca91dd9975880c07 Mon Sep 17 00:00:00 2001 From: srielau Date: Mon, 27 Apr 2026 18:29:44 +0000 Subject: [PATCH 7/9] Trigger CI From 4edb448dbf88d34f2119c3e9596ae68f4969bc81 Mon Sep 17 00:00:00 2001 From: srielau Date: Mon, 27 Apr 2026 19:23:23 +0000 Subject: [PATCH 8/9] [SPARK-56605][SQL] Drop captured SQLConf from resolution classes; use thread-local Per review feedback (vladimirg-db): SQLConf is a thread-local, and view/UDF resolution explicitly nests new SQLConfs onto that thread-local via SQLConf.withExistingConf(...). Capturing a conf as a constructor field on Resolver / FunctionResolution / RelationResolution makes those classes read a stale conf during nested view-body / SQL-function resolution. This removes the captured conf: - FunctionResolution: drop conf constructor param; mix in SQLConfHelper so conf.pathEnabled / conf.prioritizeSystemCatalog read SQLConf.get (live). - RelationResolution: drop sessionConf constructor param and the override def conf; SQLConfHelper.conf = SQLConf.get covers all reads. - Resolver: drop conf constructor param; replace conf.maxToStringFields call sites with SQLConf.get.maxToStringFields. - Analyzer: stop threading sessionConf / resolutionConf into the resolution objects' constructors. Keep Analyzer.sessionConf and the SQLConf.withExistingConf(c) wrap in executeAndCheck/executeSameContext -- that is the legitimate mechanism that establishes the active conf for the whole analyzer run; nested view/UDF blocks correctly push their own conf on top. - TimezoneAwareExpressionResolverSuite: drop the now-removed third arg to new FunctionResolution(...). --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 9 ++------- .../spark/sql/catalyst/analysis/FunctionResolution.scala | 7 +++---- .../spark/sql/catalyst/analysis/RelationResolution.scala | 6 +----- .../spark/sql/catalyst/analysis/resolver/Resolver.scala | 7 +++---- .../resolver/TimezoneAwareExpressionResolverSuite.scala | 3 +-- 5 files changed, 10 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b676160356ef9..08dcdc36d82e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -300,15 +300,10 @@ class Analyzer( extends RuleExecutor[LogicalPlan] with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper { - /** Conf to use for path-based resolution and error messages; uses session conf when available. */ - private[sql] def resolutionConf: SQLConf = sessionConf.getOrElse(SQLConf.get) - - private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog private val relationResolution = - new RelationResolution(catalogManager, sharedRelationCache, sessionConf) - private val functionResolution = new FunctionResolution(catalogManager, relationResolution, - resolutionConf) + new RelationResolution(catalogManager, sharedRelationCache) + private val functionResolution = new FunctionResolution(catalogManager, relationResolution) override protected def validatePlanChanges( previousPlan: LogicalPlan, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index e0272e219df04..27937e3c775b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -24,7 +24,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkThrowable import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -65,9 +65,8 @@ import org.apache.spark.sql.types._ class FunctionResolution( override val catalogManager: CatalogManager, - relationResolution: RelationResolution, - conf: org.apache.spark.sql.internal.SQLConf) - extends DataTypeErrorsBase with LookupCatalog with Logging { + relationResolution: RelationResolution) + extends DataTypeErrorsBase with LookupCatalog with SQLConfHelper with Logging { private val v1SessionCatalog = catalogManager.v1SessionCatalog private val trimWarningEnabled = new AtomicBoolean(true) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 64773da917c96..333366927723d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -49,8 +49,7 @@ import org.apache.spark.util.ArrayImplicits._ class RelationResolution( override val catalogManager: CatalogManager, - sharedRelationCache: RelationCache, - sessionConf: Option[SQLConf] = None) + sharedRelationCache: RelationCache) extends DataTypeErrorsBase with Logging with LookupCatalog @@ -58,9 +57,6 @@ class RelationResolution( type CacheKey = (Seq[String], Option[TimeTravelSpec]) - /** Matches [[Analyzer.resolutionConf]] when the analyzer passes its session conf through. */ - override def conf: SQLConf = sessionConf.getOrElse(SQLConf.get) - val v1SessionCatalog = catalogManager.v1SessionCatalog private def relationCache: mutable.Map[CacheKey, LogicalPlan] = AnalysisContext.get.relationCache diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala index 5bb92d1366925..4b7ed50181874 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala @@ -85,7 +85,6 @@ class Resolver( override val extensions: Seq[ResolverExtension] = Seq.empty, metadataResolverExtensions: Seq[ResolverExtension] = Seq.empty, externalRelationResolution: Option[RelationResolution] = None, - conf: SQLConf = SQLConf.get, extendedRewriteRules: Seq[Rule[LogicalPlan]] = Seq.empty, tracker: Option[QueryPlanningTracker] = None) extends LogicalPlanResolver @@ -106,7 +105,7 @@ class Resolver( private val relationResolution = externalRelationResolution.getOrElse { Resolver.createRelationResolution(catalogManager, sharedRelationCache) } - private val functionResolution = new FunctionResolution(catalogManager, relationResolution, conf) + private val functionResolution = new FunctionResolution(catalogManager, relationResolution) private val expressionResolver = new ExpressionResolver(this, functionResolution, planLogger) private val aggregateResolver = new AggregateResolver(this, expressionResolver) private val expressionIdAssigner = expressionResolver.getExpressionIdAssigner @@ -845,7 +844,7 @@ class Resolver( messageParameters = Map( "missingAttributes" -> makeCommaSeparatedExpressionString(missingInput.toSeq), "input" -> makeCommaSeparatedExpressionString(inputSet.toSeq), - "operator" -> operator.simpleString(conf.maxToStringFields), + "operator" -> operator.simpleString(SQLConf.get.maxToStringFields), "operation" -> makeCommaSeparatedExpressionString(attributesWithSameName.toSeq) ) ) @@ -855,7 +854,7 @@ class Resolver( messageParameters = Map( "missingAttributes" -> makeCommaSeparatedExpressionString(missingInput.toSeq), "input" -> makeCommaSeparatedExpressionString(inputSet.toSeq), - "operator" -> operator.simpleString(conf.maxToStringFields) + "operator" -> operator.simpleString(SQLConf.get.maxToStringFields) ) ) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolverSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolverSuite.scala index 1f4b3dea81b31..f54ab9e4e0ddd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolverSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolverSuite.scala @@ -39,8 +39,7 @@ class TimezoneAwareExpressionResolverSuite extends SparkFunSuite { functionResolution = new FunctionResolution( catalogManager, - Resolver.createRelationResolution(catalogManager), - org.apache.spark.sql.internal.SQLConf.get), + Resolver.createRelationResolution(catalogManager)), planLogger = new PlanLogger ) { override def resolve(expression: Expression): Expression = resolvedExpression From 42531b9ec8dd80362f7f6968b105a88879a12980 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 28 Apr 2026 07:05:04 -0700 Subject: [PATCH 9/9] [SPARK-56605][SQL] Align resolver error search paths with analysis context Use AnalysisContext-aware catalog path in single-pass unresolved relation errors and reuse function lookup path entries for unresolved routine errors, keeping error search paths consistent with actual lookup semantics. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 ++---- .../spark/sql/catalyst/analysis/FunctionResolution.scala | 2 +- .../spark/sql/catalyst/analysis/resolver/Resolver.scala | 8 ++++++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 08dcdc36d82e0..4601d7f5b1ac5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2100,10 +2100,8 @@ class Analyzer( throw QueryCompilationErrors.notAScalarFunctionError(nameParts.mkString("."), f) case FunctionType.NotFound => - val catalogPath = catalogPathForError - val searchPath = catalogManager - .sqlResolutionPathEntries(catalogPath.head, catalogPath.tail.toSeq) - .map(_.quoted) + val searchPath = + functionResolution.sqlResolutionPathEntriesForAnalysis.map(_.quoted) throw QueryCompilationErrors.unresolvedRoutineError( nameParts, searchPath, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index 27937e3c775b4..8f8c77f38feac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -101,7 +101,7 @@ class FunctionResolution( * directly, matching [[RelationResolution.relationResolutionEntries]] so routine order stays * aligned with relation order. */ - private def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = { + private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = { AnalysisContext.get.resolutionPathEntries match { case Some(entries) if conf.pathEnabled => entries case _ => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala index 4b7ed50181874..aaf7117ef4e8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.analysis.{ withPosition, + AnalysisContext, AnalysisErrorAt, CleanupAliases, FunctionResolution, @@ -583,8 +584,11 @@ class Resolver( relationsWithResolvedMetadata case None => val multipartId = unresolvedRelation.multipartIdentifier - val catalogPath = (catalogManager.currentCatalog.name() +: - catalogManager.currentNamespace).toSeq + val catalogPath = { + val ctx = AnalysisContext.get.catalogAndNamespace + if (ctx.nonEmpty) ctx + else (catalogManager.currentCatalog.name() +: catalogManager.currentNamespace).toSeq + } val searchPath = catalogManager .sqlResolutionPathEntries(catalogPath.head, catalogPath.tail.toSeq) .map(toSQLId)