Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2102,7 +2102,7 @@ class Analyzer(
throw QueryCompilationErrors.expectPersistentFuncError(
nameParts.head, cmd, mismatchHint, u)
} else {
ResolvedNonPersistentFunc(nameParts.head, V1Function(info))
ResolvedNonPersistentFunc(nameParts.head, V1Function.metadataOnly(info))
}
}.getOrElse {
val CatalogAndIdentifier(catalog, ident) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,16 @@ trait FunctionRegistryBase[T] {
/* List all of the registered function names. */
def listFunction(): Seq[FunctionIdentifier]

/* Get the class of the registered function by specified name. */
def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo]
/* Get both ExpressionInfo and FunctionBuilder in a single lookup. */
def lookupFunctionEntry(name: FunctionIdentifier): Option[(ExpressionInfo, FunctionBuilder)]

/* Get the ExpressionInfo of the registered function by specified name. */
def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] =
lookupFunctionEntry(name).map(_._1)

/* Get the builder of the registered function by specified name. */
def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder]
def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] =
lookupFunctionEntry(name).map(_._2)

/** Drop a function and return whether the function existed. */
def dropFunction(name: FunctionIdentifier): Boolean
Expand Down Expand Up @@ -245,13 +250,9 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging
functionBuilders.iterator.map(_._1).toList
}

override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = synchronized {
functionBuilders.get(normalizeFuncName(name)).map(_._1)
}

override def lookupFunctionBuilder(
name: FunctionIdentifier): Option[FunctionBuilder] = synchronized {
functionBuilders.get(normalizeFuncName(name)).map(_._2)
override def lookupFunctionEntry(
name: FunctionIdentifier): Option[(ExpressionInfo, FunctionBuilder)] = synchronized {
functionBuilders.get(normalizeFuncName(name))
}

override def dropFunction(name: FunctionIdentifier): Boolean = synchronized {
Expand Down Expand Up @@ -281,11 +282,8 @@ trait EmptyFunctionRegistryBase[T] extends FunctionRegistryBase[T] {
throw SparkUnsupportedOperationException()
}

override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = {
throw SparkUnsupportedOperationException()
}

override def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] = {
override def lookupFunctionEntry(
name: FunctionIdentifier): Option[(ExpressionInfo, FunctionBuilder)] = {
throw SparkUnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ class FunctionResolution(
val CatalogAndIdentifier(catalog, ident) =
relationResolution.expandIdentifier(u.nameParts)
catalog.asFunctionCatalog.loadFunction(ident) match {
case V1Function(_) =>
// this triggers the second time v1 function resolution but should be cheap
// (no RPC to external catalog), since the metadata has been already cached
// in FunctionRegistry during the above `catalog.loadFunction` call.
resolveV1Function(ident.asFunctionIdentifier, u.arguments, u)
case v1Func: V1Function =>
// V1Function has a lazy builder - invoke() triggers resource loading
// and builder creation only on first invocation
val func = v1Func.invoke(u.arguments)
validateFunction(func, u.arguments.length, u)
case unboundV2Func =>
resolveV2Function(unboundV2Func, u.arguments, u)
}
Expand Down Expand Up @@ -118,14 +118,6 @@ class FunctionResolution(
}
}

private def resolveV1Function(
ident: FunctionIdentifier,
arguments: Seq[Expression],
u: UnresolvedFunction): Expression = {
val func = v1SessionCatalog.resolvePersistentFunction(ident, arguments)
validateFunction(func, arguments.length, u)
}

private def validateFunction(
func: Expression,
numArgs: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
Expand All @@ -49,6 +48,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.metricview.util.MetricViewPlanner
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
Expand Down Expand Up @@ -2153,122 +2153,121 @@ class SessionCatalog(
}

/**
* Look up the `ExpressionInfo` of the given function by name if it's a persistent function.
* This supports both scalar and table functions.
* Look up a persistent function's ExpressionInfo by name (for DESCRIBE FUNCTION).
* This only fetches metadata without loading resources or creating builders.
*/
def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
val qualifiedIdent = qualifyIdentifier(name)
val db = qualifiedIdent.database.get
val funcName = qualifiedIdent.funcName
// Check if already cached in either registry
functionRegistry.lookupFunction(qualifiedIdent)
.orElse(tableFunctionRegistry.lookupFunction(qualifiedIdent))
.getOrElse {
requireDbExists(db)
if (externalCatalog.functionExists(db, funcName)) {
val metadata = externalCatalog.getFunction(db, funcName)
if (metadata.isUserDefinedFunction) {
UserDefinedFunction.fromCatalogFunction(metadata, parser).toExpressionInfo
} else {
makeExprInfoForHiveFunction(metadata.copy(identifier = qualifiedIdent))
}
val funcMetadata = fetchCatalogFunction(qualifiedIdent)
if (funcMetadata.isUserDefinedFunction) {
UserDefinedFunction.fromCatalogFunction(funcMetadata, parser).toExpressionInfo
} else {
failFunctionLookup(name)
makeExprInfoForHiveFunction(funcMetadata)
}
}
}

/**
* Look up a persistent scalar function by name and resolves it to an Expression.
* Load a persistent scalar function by name.
* Returns V1Function with:
* - Eager info (from cache or catalog fetch, no resource loading)
* - Lazy builder (resource loading only on first invoke)
*
* This matches V1 behavior where DESCRIBE doesn't load resources.
*/
def resolvePersistentFunction(
name: FunctionIdentifier, arguments: Seq[Expression]): Expression = {
resolvePersistentFunctionInternal[Expression](
name,
arguments,
functionRegistry,
registerHiveFunc = func =>
registerFunction(
func,
overrideIfExists = false,
registry = functionRegistry,
functionBuilder = makeFunctionBuilder(func)
),
registerUserDefinedFunc = function => {
val builder = makeUserDefinedScalarFuncBuilder(function)
registerUserDefinedFunction[Expression](
function = function,
overrideIfExists = false,
registry = functionRegistry,
functionBuilder = builder)
}
)
def loadPersistentScalarFunction(name: FunctionIdentifier): V1Function = {
val qualifiedIdent = qualifyIdentifier(name)

// Check cache first (no synchronization needed for reads)
functionRegistry.lookupFunctionEntry(qualifiedIdent) match {
case Some((cachedInfo, cachedBuilder)) =>
// Already cached - return with eager builder
V1Function(cachedInfo, cachedBuilder)

case None =>
// Fetch metadata eagerly (no resource loading yet)
val funcMetadata = fetchCatalogFunction(qualifiedIdent)
val info = if (funcMetadata.isUserDefinedFunction) {
UserDefinedFunction.fromCatalogFunction(funcMetadata, parser).toExpressionInfo
} else {
makeExprInfoForHiveFunction(funcMetadata)
}

// Builder factory - loads resources only on first invoke()
val builderFactory: () => FunctionBuilder = () => synchronized {
// Re-check cache (another thread may have loaded it)
functionRegistry.lookupFunctionBuilder(qualifiedIdent).getOrElse {
if (funcMetadata.isUserDefinedFunction) {
val udf = UserDefinedFunction.fromCatalogFunction(funcMetadata, parser)
registerUserDefinedFunction[Expression](
udf,
overrideIfExists = false,
functionRegistry,
makeUserDefinedScalarFuncBuilder(udf))
} else {
loadFunctionResources(funcMetadata.resources)
registerFunction(
funcMetadata,
overrideIfExists = false,
functionRegistry,
makeFunctionBuilder(funcMetadata))
}
functionRegistry.lookupFunctionBuilder(qualifiedIdent).get
}
}

V1Function(info, builderFactory)
}
}

/**
* Look up a persistent table function by name and resolves it to a LogicalPlan.
*/
def resolvePersistentTableFunction(
name: FunctionIdentifier,
arguments: Seq[Expression]): LogicalPlan = {
resolvePersistentFunctionInternal[LogicalPlan](
name,
arguments,
tableFunctionRegistry,
// We don't support persistent Hive table functions yet.
registerHiveFunc = (func: CatalogFunction) => failFunctionLookup(name),
registerUserDefinedFunc = function => {
val builder = makeUserDefinedTableFuncBuilder(function)
registerUserDefinedFunction[LogicalPlan](
function = function,
overrideIfExists = false,
registry = tableFunctionRegistry,
functionBuilder = builder)
arguments: Seq[Expression]): LogicalPlan = synchronized {
val qualifiedIdent = qualifyIdentifier(name)
if (tableFunctionRegistry.functionExists(qualifiedIdent)) {
// Already cached
tableFunctionRegistry.lookupFunction(qualifiedIdent, arguments)
} else {
// Load from catalog
val funcMetadata = fetchCatalogFunction(qualifiedIdent)
if (!funcMetadata.isUserDefinedFunction) {
// Hive table functions are not supported
failFunctionLookup(qualifiedIdent)
}
)
val udf = UserDefinedFunction.fromCatalogFunction(funcMetadata, parser)
registerUserDefinedFunction[LogicalPlan](
udf,
overrideIfExists = false,
tableFunctionRegistry,
makeUserDefinedTableFuncBuilder(udf))
tableFunctionRegistry.lookupFunction(qualifiedIdent, arguments)
}
}

private def resolvePersistentFunctionInternal[T](
name: FunctionIdentifier,
arguments: Seq[Expression],
registry: FunctionRegistryBase[T],
registerHiveFunc: CatalogFunction => Unit,
registerUserDefinedFunc: UserDefinedFunction => Unit): T = {
// `synchronized` is used to prevent multiple threads from concurrently resolving the
// same function that has not yet been loaded into the function registry. This is needed
// because calling `registerFunction` twice with `overrideIfExists = false` can lead to
// a FunctionAlreadyExistsException.
synchronized {
val qualifiedIdent = qualifyIdentifier(name)
val db = qualifiedIdent.database.get
val funcName = qualifiedIdent.funcName
if (registry.functionExists(qualifiedIdent)) {
// This function has been already loaded into the function registry.
registry.lookupFunction(qualifiedIdent, arguments)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously assumed the second round would go here, seems not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v2 function does not support table function yet, so scalar and table functions has different code paths:

} else {
// The function has not been loaded to the function registry, which means
// that the function is a persistent function (if it actually has been registered
// in the metastore). We need to first put the function in the function registry.
val catalogFunction = try {
externalCatalog.getFunction(db, funcName)
} catch {
case _: AnalysisException => failFunctionLookup(qualifiedIdent)
}
// Please note that qualifiedName is provided by the user. However,
// catalogFunction.identifier.unquotedString is returned by the underlying
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
if (!catalogFunction.isUserDefinedFunction) {
loadFunctionResources(catalogFunction.resources)
registerHiveFunc(funcMetadata)
} else {
val function = UserDefinedFunction.fromCatalogFunction(funcMetadata, parser)
registerUserDefinedFunc(function)
}
// Now, we need to create the Expression.
registry.lookupFunction(qualifiedIdent, arguments)
}
/**
* Fetch a catalog function from the external catalog.
*/
private def fetchCatalogFunction(qualifiedIdent: FunctionIdentifier): CatalogFunction = {
val db = qualifiedIdent.database.get
val funcName = qualifiedIdent.funcName
requireDbExists(db)
try {
// Please note that qualifiedIdent is provided by the user. However,
// CatalogFunction.identifier is returned by the underlying catalog.
// So, it is possible that qualifiedIdent is not exactly the same as
// catalogFunction.identifier (difference is on case-sensitivity).
// At here, we preserve the input from the user.
externalCatalog.getFunction(db, funcName).copy(identifier = qualifiedIdent)
} catch {
case _: NoSuchPermanentFunctionException | _: NoSuchFunctionException =>
failFunctionLookup(qualifiedIdent)
}
}

Expand All @@ -2290,9 +2289,9 @@ class SessionCatalog(
def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
if (name.database.isEmpty) {
resolveBuiltinOrTempFunction(name.funcName, children)
.getOrElse(resolvePersistentFunction(name, children))
.getOrElse(loadPersistentScalarFunction(name).invoke(children))
} else {
resolvePersistentFunction(name, children)
loadPersistentScalarFunction(name).invoke(children)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, DefaultVa
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -671,7 +670,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
throw SparkUnsupportedOperationException()
}
override def loadFunction(ident: Identifier): UnboundFunction = {
V1Function(v1Catalog.lookupPersistentFunction(ident.asFunctionIdentifier))
v1Catalog.loadPersistentScalarFunction(ident.asFunctionIdentifier)
}
override def functionExists(ident: Identifier): Boolean = {
v1Catalog.isPersistentFunction(ident.asFunctionIdentifier)
Expand Down
Loading