From 1759ca21b95f090483bf74e0b3f7e8ef20fd5640 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 23 Feb 2026 10:23:24 +0900 Subject: [PATCH] Rewrite spark.catalog in SQL DDL formatted text --- docs/sql-ref-ansi-compliance.md | 1 + .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 1 + .../sql/catalyst/parser/SqlBaseParser.g4 | 3 + .../org/apache/spark/sql/SparkSession.scala | 32 + .../spark/sql/catalog/CatalogImpl.scala | 633 ++++++++++++ .../sql/connector/catalog/CatalogV2Util.scala | 19 +- .../parser/TableIdentifierParserSuite.scala | 1 + .../apache/spark/sql/connect/Catalog.scala | 729 -------------- .../spark/sql/connect/SparkSession.scala | 32 +- .../apache/spark/sql/classic/Catalog.scala | 946 +----------------- .../sql/classic/ClassicCatalogHelper.scala | 91 ++ .../spark/sql/classic/SparkSession.scala | 20 +- .../spark/sql/execution/CacheManager.scala | 22 + .../spark/sql/execution/SparkSqlParser.scala | 7 + .../command/ShowCachedTablesCommand.scala | 36 + .../spark/sql/execution/command/cache.scala | 3 +- .../spark/sql/execution/command/tables.scala | 11 +- .../datasources/v2/CatalogDDLHelpers.scala | 263 +++++ .../datasources/v2/ShowTablesExec.scala | 24 +- .../sql/execution/SparkSqlParserSuite.scala | 4 + .../spark/sql/internal/CatalogSuite.scala | 37 +- 21 files changed, 1203 insertions(+), 1712 deletions(-) create mode 100644 sql/api/src/main/scala/org/apache/spark/sql/catalog/CatalogImpl.scala delete mode 100644 sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/classic/ClassicCatalogHelper.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/ShowCachedTablesCommand.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogDDLHelpers.scala diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index c40d9c95d61c2..b11b78d990d1b 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -440,6 +440,7 @@ Below is a list of all the keywords in Spark SQL. |BY|non-reserved|non-reserved|reserved| |BYTE|non-reserved|non-reserved|non-reserved| |CACHE|non-reserved|non-reserved|non-reserved| +|CACHED|non-reserved|non-reserved|non-reserved| |CALL|reserved|non-reserved|reserved| |CALLED|non-reserved|non-reserved|non-reserved| |CASCADE|non-reserved|non-reserved|non-reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 177695e38aacc..16572e74bd61b 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -159,6 +159,7 @@ BUCKETS: 'BUCKETS'; BY: 'BY'; BYTE: 'BYTE'; CACHE: 'CACHE'; +CACHED: 'CACHED'; CALL: 'CALL'; CALLED: 'CALLED'; CASCADE: 'CASCADE'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index d61dd137ec5aa..04675f4a90148 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -375,6 +375,7 @@ statement | SHOW PROCEDURES ((FROM | IN) identifierReference)? #showProcedures | SHOW CREATE TABLE identifierReference (AS SERDE)? #showCreateTable | SHOW CURRENT namespace #showCurrentNamespace + | SHOW CACHED TABLES #showCachedTables | SHOW CATALOGS (LIKE? pattern=stringLit)? #showCatalogs | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) PROCEDURE identifierReference #describeProcedure @@ -1902,6 +1903,7 @@ ansiNonReserved | BY | BYTE | CACHE + | CACHED | CALLED | CASCADE | CATALOG @@ -2270,6 +2272,7 @@ nonReserved | BY | BYTE | CACHE + | CACHED | CALL | CALLED | CASCADE diff --git a/sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala index 9c6e3bdb90786..76dc50104a5c9 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -445,6 +445,38 @@ abstract class SparkSession extends Serializable with Closeable { */ def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[lang.Long] + /** + * Parses a multipart identifier (e.g. table or namespace name) into parts. + * Used by the Catalog API when building SQL DDL strings. + * + * @since 4.2.0 + */ + def parseMultipartIdentifier(identifier: String): Seq[String] + + /** + * Quotes an identifier part for safe use in SQL strings (e.g. backtick-wrapped). + * Used by the Catalog API when building SQL DDL strings. + * + * @since 4.2.0 + */ + def quoteIdentifier(identifier: String): String + + /** + * Returns the current default database (namespace) in this session. + * Used by the Catalog API. + * + * @since 4.2.0 + */ + def currentDatabase: String + + /** + * Returns the current default catalog in this session. + * Used by the Catalog API. + * + * @since 4.2.0 + */ + def currentCatalog(): String + /* ------------------------- * | Catalog-related methods | * ------------------------- */ diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalog/CatalogImpl.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalog/CatalogImpl.scala new file mode 100644 index 0000000000000..cb73e94bcd8a3 --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalog/CatalogImpl.scala @@ -0,0 +1,633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog + +import java.util.Locale + +import scala.util.control.NonFatal + +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.StorageLevel + +/** + * Optional helper for catalog operations that can be implemented without running a Spark job + * (e.g. by using session catalog and analysis only). When provided, the unified Catalog + * implementation uses it for tableExists, getTable, getFunction, listColumns, functionExists, + * and isCached to preserve 0-job behavior. When not provided (e.g. Connect), those operations + * use SQL DDL and may trigger a job. + * + * @since 4.2.0 + */ +trait CatalogHelper { + + /** True if table/view exists at the given multipart identifier (0-job when possible). */ + def tableExists(nameParts: Seq[String]): Boolean + + /** Table metadata for the given multipart identifier (0-job when possible). */ + def getTableMetadata(nameParts: Seq[String]): Table + + /** Function metadata for the given multipart identifier (0-job when possible). */ + def getFunctionMetadata(nameParts: Seq[String]): Function + + /** Column list for the table/view at the given multipart identifier (0-job when possible). */ + def listColumns(nameParts: Seq[String]): Seq[Column] + + /** True if function exists at the given multipart identifier (0-job when possible). */ + def functionExists(nameParts: Seq[String]): Boolean + + /** True if table/view at the given multipart identifier is cached (0-job when possible). */ + def isCached(nameParts: Seq[String]): Boolean +} + +/** + * Unified Catalog implementation that uses SQL DDL strings and the public SparkSession API only. + * When a [[CatalogHelper]] is provided (e.g. by Classic), operations that support 0-job paths + * use the helper to avoid extra jobs; otherwise (e.g. Connect) those operations use SQL. + * + * @param session Spark session (Classic or Connect) + * @param helper Optional helper for 0-job catalog lookups; None for SQL-only path + * @since 4.2.0 + */ +class CatalogImpl(session: SparkSession, helper: Option[CatalogHelper]) extends Catalog { + + private def sql(str: String): DataFrame = session.sql(str) + private def parseIdent(name: String): Seq[String] = session.parseMultipartIdentifier(name) + private def quote(name: String): String = session.quoteIdentifier(name) + private def quoted(nameParts: Seq[String]): String = nameParts.map(quote).mkString(".") + private def escapeSingleQuotes(s: String): String = s.replace("'", "''") + + private def makeDataset[T: org.apache.spark.sql.Encoder](data: Seq[T]): Dataset[T] = + session.createDataset(data) + + // Encoders for catalog types (DefinedByConstructorParams) + private implicit val databaseEncoder: org.apache.spark.sql.Encoder[Database] = + ScalaReflection.encoderFor(ScalaReflection.localTypeOf[Database]) + .asInstanceOf[org.apache.spark.sql.Encoder[Database]] + private implicit val tableEncoder: org.apache.spark.sql.Encoder[Table] = + ScalaReflection.encoderFor(ScalaReflection.localTypeOf[Table]) + .asInstanceOf[org.apache.spark.sql.Encoder[Table]] + private implicit val functionEncoder: org.apache.spark.sql.Encoder[Function] = + ScalaReflection.encoderFor(ScalaReflection.localTypeOf[Function]) + .asInstanceOf[org.apache.spark.sql.Encoder[Function]] + private implicit val columnEncoder: org.apache.spark.sql.Encoder[Column] = + ScalaReflection.encoderFor(ScalaReflection.localTypeOf[Column]) + .asInstanceOf[org.apache.spark.sql.Encoder[Column]] + private implicit val catalogMetadataEncoder: org.apache.spark.sql.Encoder[CatalogMetadata] = + ScalaReflection.encoderFor(ScalaReflection.localTypeOf[CatalogMetadata]) + .asInstanceOf[org.apache.spark.sql.Encoder[CatalogMetadata]] + + override def currentDatabase: String = session.currentDatabase + + override def setCurrentDatabase(dbName: String): Unit = { + val quotedNs = QuotingUtils.quoteNameParts(parseIdent(dbName)) + // USE NAMESPACE sets multi-part namespace in current catalog; plain USE is catalog.namespace + sql(s"USE NAMESPACE $quotedNs") + } + + override def listDatabases(): Dataset[Database] = listDatabasesInternal(None) + + override def listDatabases(pattern: String): Dataset[Database] = + listDatabasesInternal(Some(pattern)) + + private def listDatabasesInternal(patternOpt: Option[String]): Dataset[Database] = { + val catalogName = session.currentCatalog() + val sqlStr = patternOpt match { + case Some(p) => s"SHOW NAMESPACES LIKE '${escapeSingleQuotes(p)}'" + case None => "SHOW NAMESPACES" + } + val rows = sql(sqlStr).collect() + val databases = rows.map { row => + val rawName = row.getString(0) + // Only quote if needed; if SHOW NAMESPACES already returned quoted names, don't double-quote + val name = if (rawName.startsWith("`")) rawName else QuotingUtils.quoteIfNeeded(rawName) + new Database( + name = name, + catalog = catalogName, + description = null, + locationUri = null + ) + } + makeDataset(databases.toSeq) + } + + override def listTables(): Dataset[Table] = listTables(session.currentDatabase) + + @throws[AnalysisException]("database does not exist") + override def listTables(dbName: String): Dataset[Table] = + listTablesInternal(dbName, None) + + @throws[AnalysisException]("database does not exist") + override def listTables(dbName: String, pattern: String): Dataset[Table] = + listTablesInternal(dbName, Some(pattern)) + + private def listTablesInternal(dbName: String, pattern: Option[String]): Dataset[Table] = { + val dbNameParts = parseIdent(dbName) + // When dbName is qualified (e.g. "testcat.my_db"), use first part as catalog for result + val catalogName = if (dbNameParts.length > 1) dbNameParts.head else session.currentCatalog() + val currentNs = session.currentDatabase + val quotedNs = dbNameParts.map(quote).mkString(".") + val sqlStr = pattern match { + case Some(p) => s"SHOW TABLES IN $quotedNs LIKE '${escapeSingleQuotes(p)}'" + case None if quotedNs == currentNs => "SHOW TABLES" + case None => s"SHOW TABLES IN $quotedNs" + } + val rows = sql(sqlStr).collect() + val tables = rows.map { row => + val namespaceName = row.getString(0) + val tableName = row.getString(1) + val isTempView = row.getBoolean(2) + val ns = if (isTempView) { + if (namespaceName.isEmpty) Array.empty[String] else Array(namespaceName) + } else { + parseIdent(namespaceName).toArray + } + // When dbName was qualified (e.g. "testcat.my_db"), namespace from SHOW TABLES may be + // full "testcat.my_db"; use only the namespace within the catalog for resolution. + val nsInCatalog = if (isTempView) ns else { + if (dbNameParts.length > 1 && ns.nonEmpty && ns.head == catalogName) ns.tail else ns + } + val nameParts = if (isTempView) ns :+ tableName else catalogName +: nsInCatalog :+ tableName + helper match { + case Some(h) => + try { + h.getTableMetadata(nameParts.toSeq) + } catch { + case _: Exception => + new Table( + name = tableName, + catalog = if (isTempView) null else catalogName, + namespace = nsInCatalog, + description = null, + tableType = null, + isTemporary = isTempView + ) + } + case None => + new Table( + name = tableName, + catalog = if (isTempView) null else catalogName, + namespace = nsInCatalog, + description = null, + tableType = null, + isTemporary = isTempView + ) + } + } + makeDataset(tables.toSeq) + } + + override def listFunctions(): Dataset[Function] = listFunctions(session.currentDatabase) + + @throws[AnalysisException]("database does not exist") + override def listFunctions(dbName: String): Dataset[Function] = + listFunctionsInternal(dbName, None) + + @throws[AnalysisException]("database does not exist") + override def listFunctions(dbName: String, pattern: String): Dataset[Function] = + listFunctionsInternal(dbName, Some(pattern)) + + private def listFunctionsInternal(dbName: String, pattern: Option[String]): Dataset[Function] = { + val catalogName = session.currentCatalog() + val currentNs = session.currentDatabase + val quotedNs = parseIdent(dbName).map(quote).mkString(".") + val sqlStr = pattern match { + case Some(p) => s"SHOW FUNCTIONS IN $quotedNs LIKE '${escapeSingleQuotes(p)}'" + case None if quotedNs == currentNs => "SHOW FUNCTIONS" + case None => s"SHOW FUNCTIONS IN $quotedNs" + } + val rows = sql(sqlStr).collect() + val nsArray = parseIdent(dbName).toArray + val functions = rows.map { row => + val funcName = row.getString(0) + val parts = try parseIdent(funcName) catch { case NonFatal(_) => Seq(funcName) } + val name = if (parts.nonEmpty) parts.last else funcName + val nameParts = catalogName +: nsArray :+ name + val (namespace, isTemporary) = helper match { + case Some(h) => + try { + val meta = h.getFunctionMetadata(nameParts.toSeq) + (if (meta.isTemporary) Array.empty[String] else nsArray, meta.isTemporary) + } catch { + case NonFatal(_) => + // Temp functions may resolve only by name; try unqualified lookup + try { + val meta = h.getFunctionMetadata(Seq(name)) + if (meta.isTemporary) (Array.empty[String], true) else (nsArray, false) + } catch { case NonFatal(_) => (nsArray, false) } + } + case None => (nsArray, false) + } + new Function( + name = name, + catalog = catalogName, + namespace = namespace, + description = null, + className = null, + isTemporary = isTemporary + ) + } + makeDataset(functions.toSeq) + } + + @throws[AnalysisException]("table does not exist") + override def listColumns(tableName: String): Dataset[Column] = { + val nameParts = parseIdent(tableName) + listColumns(nameParts) + } + + @throws[AnalysisException]("database or table does not exist") + override def listColumns(dbName: String, tableName: String): Dataset[Column] = { + val nameParts = Seq("spark_catalog", dbName, tableName) + listColumns(nameParts) + } + + private def listColumns(nameParts: Seq[String]): Dataset[Column] = { + val columns = helper match { + case Some(h) => h.listColumns(nameParts) + case None => + val quotedTable = quoted(nameParts) + val rows = sql(s"DESCRIBE TABLE $quotedTable").collect() + rows.map { row => + new Column( + name = row.getString(0), + description = null, + dataType = row.getString(1), + nullable = true, + isPartition = false, + isBucket = false, + isCluster = false + ) + }.toSeq + } + makeDataset(columns) + } + + @throws[AnalysisException]("database does not exist") + override def getDatabase(dbName: String): Database = { + val idents = parseIdent(dbName) + // Unqualified name: try session catalog first (so "testdb" returns spark_catalog.testdb when + // both exist), then current catalog. + val identsToDescribe = if (idents.length == 1) { + val sessionQualified = "spark_catalog" +: idents + if (databaseExists(sessionQualified.mkString("."))) sessionQualified + else (session.currentCatalog() +: idents).toSeq + } else { + idents + } + val quotedNs = QuotingUtils.quoteNameParts(identsToDescribe) + val rows = sql(s"DESCRIBE NAMESPACE EXTENDED $quotedNs").collect() + val info = rows.map(row => (row.getString(0), Option(row.getString(1)).getOrElse(""))).toMap + val name = info.getOrElse("Namespace Name", identsToDescribe.last) + val catalog = info.getOrElse("Catalog Name", + if (identsToDescribe.length > 1) identsToDescribe.head else "") + val description = info.get("Comment").orNull + val locationUri = info.get("Location").orNull + new Database( + name = name, + catalog = catalog, + description = description, + locationUri = locationUri) + } + + @throws[AnalysisException]("table does not exist") + override def getTable(tableName: String): Table = { + val nameParts = parseIdent(tableName) + helper match { + case Some(h) => h.getTableMetadata(nameParts) + case None => + val quotedTable = quoted(nameParts) + val rows = sql(s"DESCRIBE TABLE EXTENDED $quotedTable").collect() + val info = rows.map(r => (r.getString(0), Option(r.getString(1)).getOrElse(""))).toMap + val tableType = info.getOrElse("Type", "TABLE") + val isTemp = info.get("Is Temporary").exists(_.equalsIgnoreCase("true")) + val ns = if (nameParts.length > 2) nameParts.drop(1).dropRight(1).toArray + else nameParts.dropRight(1).toArray + new Table( + name = nameParts.last, + catalog = if (nameParts.length > 2) nameParts.head else null, + namespace = ns, + description = info.get("Comment").orNull, + tableType = tableType, + isTemporary = isTemp + ) + } + } + + @throws[AnalysisException]("database or table does not exist") + override def getTable(dbName: String, tableName: String): Table = { + val nameParts = Seq("spark_catalog", dbName, tableName) + helper match { + case Some(h) => h.getTableMetadata(nameParts) + case None => getTable(nameParts.mkString(".")) + } + } + + @throws[AnalysisException]("function does not exist") + override def getFunction(functionName: String): Function = { + val nameParts = parseIdent(functionName) + helper match { + case Some(h) => h.getFunctionMetadata(nameParts) + case None => + val quotedFunc = quoted(nameParts) + sql(s"DESCRIBE FUNCTION EXTENDED $quotedFunc").collect() + new Function( + name = nameParts.last, + catalog = if (nameParts.length > 2) nameParts.head else null, + namespace = if (nameParts.length > 2) nameParts.drop(1).dropRight(1).toArray else null, + description = null, + className = null, + isTemporary = false + ) + } + } + + @throws[AnalysisException]("database or function does not exist") + override def getFunction(dbName: String, functionName: String): Function = { + val nameParts = Seq("spark_catalog", dbName, functionName) + helper match { + case Some(h) => h.getFunctionMetadata(nameParts) + case None => getFunction(s"$dbName.$functionName") + } + } + + override def databaseExists(dbName: String): Boolean = { + try { + val parts = try parseIdent(dbName) catch { case NonFatal(_) => Seq(dbName) } + val (sqlStr, nameToMatch) = if (parts.length == 1) { + ("SHOW NAMESPACES", parts.head) + } else { + val quotedCatalog = quote(parts.head) + (s"SHOW NAMESPACES IN $quotedCatalog", parts.tail.mkString(".")) + } + val rows = sql(sqlStr).collect() + rows.exists(row => row.getString(0).equalsIgnoreCase(nameToMatch)) + } catch { + case _: AnalysisException => false + case NonFatal(_) => false + } + } + + override def tableExists(tableName: String): Boolean = { + val nameParts = parseIdent(tableName) + helper match { + case Some(h) => h.tableExists(nameParts) + case None => + if (nameParts.isEmpty) return false + val nsParts = nameParts.init + val tablePart = nameParts.last + val quotedNs = if (nsParts.isEmpty) session.currentDatabase + else nsParts.map(quote).mkString(".") + val sqlStr = if (nsParts.isEmpty) "SHOW TABLES" else s"SHOW TABLES IN $quotedNs" + val rows = sql(sqlStr).collect() + rows.exists(row => row.getString(1).equalsIgnoreCase(tablePart)) + } + } + + override def tableExists(dbName: String, tableName: String): Boolean = { + tableExists(s"$dbName.$tableName") + } + + override def functionExists(functionName: String): Boolean = { + val nameParts = parseIdent(functionName) + helper match { + case Some(h) => h.functionExists(nameParts) + case None => + try { + val quotedFunc = quoted(nameParts) + sql(s"DESCRIBE FUNCTION $quotedFunc").collect() + true + } catch { case _: AnalysisException => false } + } + } + + override def functionExists(dbName: String, functionName: String): Boolean = { + functionExists(s"$dbName.$functionName") + } + + override def createTable(tableName: String, path: String): DataFrame = { + createTable(tableName, path, "parquet") + } + + override def createTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, source, Map("path" -> path)) + } + + override def createTable( + tableName: String, + source: String, + options: Map[String, String]): DataFrame = { + createTable(tableName, source, new StructType, options) + } + + override def createTable( + tableName: String, + source: String, + description: String, + options: Map[String, String]): DataFrame = { + createTable(tableName, source, new StructType, description, options) + } + + override def createTable( + tableName: String, + source: String, + schema: StructType, + options: Map[String, String]): DataFrame = { + createTable(tableName, source, schema, "", options) + } + + override def createTable( + tableName: String, + source: String, + schema: StructType, + description: String, + options: Map[String, String]): DataFrame = { + val quotedTable = quoted(parseIdent(tableName)) + val schemaPart = if (schema.isEmpty) "" else { + schema.map { f => + val cmt = f.getComment() + .map(c => s" COMMENT '${escapeSingleQuotes(c)}'") + .getOrElse("") + s"${quote(f.name)} ${f.dataType.sql}$cmt" + }.mkString(", ") + } + val optionsClause = if (options.isEmpty) "" else { + val opts = options.map { case (k, v) => + s"${quote(k)} '${escapeSingleQuotes(v)}'" + }.mkString(", ") + " OPTIONS (" + opts + ")" + } + val commentPart = if (description.nonEmpty) { + s" COMMENT '${escapeSingleQuotes(description)}'" + } else "" + val sqlStr = if (schema.isEmpty) { + s"CREATE TABLE $quotedTable USING $source$optionsClause$commentPart" + } else { + s"CREATE TABLE $quotedTable ($schemaPart) USING $source$optionsClause$commentPart" + } + sql(sqlStr) + session.table(tableName) + } + + override def dropTempView(viewName: String): Boolean = { + try { + val q = quote(viewName) + sql(s"DROP VIEW IF EXISTS $q") + true + } catch { case NonFatal(_) => false } + } + + override def dropGlobalTempView(viewName: String): Boolean = { + try { + val q = quote(viewName) + sql(s"DROP VIEW IF EXISTS global_temp.$q") + true + } catch { case NonFatal(_) => false } + } + + override def recoverPartitions(tableName: String): Unit = { + val nameParts = parseIdent(tableName) + helper match { + case Some(h) => + try { + val table = h.getTableMetadata(nameParts) + val isViewOrTemp = table.isTemporary || (table.tableType != null && + (table.tableType.toUpperCase(Locale.ROOT).contains("VIEW") || + table.tableType.toUpperCase(Locale.ROOT) == "TEMPORARY")) + if (isViewOrTemp) { + throw new AnalysisException( + errorClass = "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE", + messageParameters = Map( + "viewName" -> QuotingUtils.quoteNameParts(nameParts), + "operation" -> "recoverPartitions()")) + } + } catch { + case e: AnalysisException + if e.getCondition == "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE" && + Option(e.getMessageParameters.get("operation")).contains("MSCK REPAIR TABLE") => + throw new AnalysisException( + errorClass = "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE", + messageParameters = Map( + "viewName" -> Option(e.getMessageParameters.get("viewName")) + .filter(v => v != null && v.startsWith("`")) + .getOrElse(QuotingUtils.quoteNameParts(nameParts)), + "operation" -> "recoverPartitions()")) + } + case None => + } + val quotedTable = quoted(nameParts) + try { + sql(s"MSCK REPAIR TABLE $quotedTable") + } catch { + case e: AnalysisException + if e.getCondition == "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE" && + Option(e.getMessageParameters.get("operation")).contains("MSCK REPAIR TABLE") => + throw new AnalysisException( + errorClass = "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE", + messageParameters = Map( + "viewName" -> Option(e.getMessageParameters.get("viewName")) + .filter(v => v != null && v.startsWith("`")) + .getOrElse(QuotingUtils.quoteNameParts(nameParts)), + "operation" -> "recoverPartitions()")) + } + } + + override def isCached(tableName: String): Boolean = { + val nameParts = parseIdent(tableName) + helper match { + case Some(h) => h.isCached(nameParts) + case None => + val quotedTable = quoted(nameParts) + val rows = sql("SHOW CACHED TABLES").collect() + rows.exists(row => row.getString(1).equalsIgnoreCase(nameParts.last)) + } + } + + private def storageLevelToOptionString(level: StorageLevel): String = level match { + case l if l == StorageLevel.NONE => "NONE" + case l if l == StorageLevel.DISK_ONLY => "DISK_ONLY" + case l if l == StorageLevel.DISK_ONLY_2 => "DISK_ONLY_2" + case l if l == StorageLevel.DISK_ONLY_3 => "DISK_ONLY_3" + case l if l == StorageLevel.MEMORY_ONLY => "MEMORY_ONLY" + case l if l == StorageLevel.MEMORY_ONLY_2 => "MEMORY_ONLY_2" + case l if l == StorageLevel.MEMORY_ONLY_SER => "MEMORY_ONLY_SER" + case l if l == StorageLevel.MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2" + case l if l == StorageLevel.MEMORY_AND_DISK => "MEMORY_AND_DISK" + case l if l == StorageLevel.MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2" + case l if l == StorageLevel.MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER" + case l if l == StorageLevel.MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2" + case l if l == StorageLevel.OFF_HEAP => "OFF_HEAP" + case _ => "" + } + + override def cacheTable(tableName: String): Unit = { + val quotedName = quoted(parseIdent(tableName)) + sql(s"CACHE TABLE $quotedName") + } + + override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit = { + val quotedName = quoted(parseIdent(tableName)) + val levelStr = storageLevelToOptionString(storageLevel) + val sqlStr = if (levelStr.nonEmpty) { + s"CACHE TABLE $quotedName OPTIONS (storageLevel '$levelStr')" + } else { + s"CACHE TABLE $quotedName" + } + sql(sqlStr) + } + + override def uncacheTable(tableName: String): Unit = { + val quotedName = quoted(parseIdent(tableName)) + sql(s"UNCACHE TABLE $quotedName") + } + + override def clearCache(): Unit = sql("CLEAR CACHE") + + override def refreshTable(tableName: String): Unit = { + val quotedName = quoted(parseIdent(tableName)) + sql(s"REFRESH TABLE $quotedName") + } + + override def refreshByPath(resourcePath: String): Unit = { + sql(s"REFRESH '${escapeSingleQuotes(resourcePath)}'") + } + + override def currentCatalog(): String = session.currentCatalog() + + override def setCurrentCatalog(catalogName: String): Unit = { + val quoted = quote(catalogName) + sql(s"SET CATALOG $quoted") + } + + override def listCatalogs(): Dataset[CatalogMetadata] = listCatalogsInternal(None) + + override def listCatalogs(pattern: String): Dataset[CatalogMetadata] = + listCatalogsInternal(Some(pattern)) + + private def listCatalogsInternal(patternOpt: Option[String]): Dataset[CatalogMetadata] = { + val sqlStr = patternOpt match { + case Some(p) => s"SHOW CATALOGS LIKE '${escapeSingleQuotes(p)}'" + case None => "SHOW CATALOGS" + } + val rows = sql(sqlStr).collect() + val metadata = rows.map(row => new CatalogMetadata(row.getString(0), null)).toSeq + makeDataset(metadata) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e0b523d18afcc..1252cf9064b29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -539,15 +539,30 @@ private[sql] object CatalogV2Util { collation: Option[String], provider: Option[String], external: Boolean = false): Map[String, String] = { + val catalogUtils = org.apache.spark.sql.catalyst.catalog.CatalogUtils + // When OPTIONS contain 'path' but no explicit LOCATION, treat path as location and external. + // Parser's cleanTableOptions merges OPTIONS('path') into location, so we may have location + // without "path" in options; ensure we pass both path and external to the catalog. + val effectiveLocation = location.orElse(options.get("path").map { p => + catalogUtils.stringToURI(p).toString + }) + val effectiveExternal = external || options.contains("path") || location.nonEmpty + // So the catalog can expose "path", add it when we have location but options had no "path" + val pathProperty = if (effectiveLocation.isDefined && !options.contains("path")) { + Some("path" -> effectiveLocation.get) + } else { + None + } properties ++ options ++ // to make the transition to the "option." prefix easier, add both options.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value } ++ convertToProperties(serdeInfo) ++ - (if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++ + (if (effectiveExternal) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++ + pathProperty ++ provider.map(TableCatalog.PROP_PROVIDER -> _) ++ comment.map(TableCatalog.PROP_COMMENT -> _) ++ collation.map(TableCatalog.PROP_COLLATION -> _) ++ - location.map(TableCatalog.PROP_LOCATION -> _) + effectiveLocation.map(TableCatalog.PROP_LOCATION -> _) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala index 0f32922728814..7ac697db8c7f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala @@ -48,6 +48,7 @@ class TableIdentifierParserSuite extends SQLKeywordUtils { "bucket", "buckets", "by", + "cached", "cascade", "change", "cluster", diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala deleted file mode 100644 index 6850ffd122608..0000000000000 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala +++ /dev/null @@ -1,729 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connect - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog -import org.apache.spark.sql.catalog.{CatalogMetadata, Column, Database, Function, Table} -import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder} -import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter} -import org.apache.spark.sql.types.StructType -import org.apache.spark.storage.StorageLevel - -class Catalog(sparkSession: SparkSession) extends catalog.Catalog { - - /** - * Returns the current default database in this session. - * - * @since 3.5.0 - */ - override def currentDatabase: String = - sparkSession - .newDataset(StringEncoder) { builder => - builder.getCatalogBuilder.getCurrentDatabaseBuilder - } - .head() - - /** - * Sets the current database (namespace) in this session. - * - * @since 3.5.0 - */ - @throws[AnalysisException]("database does not exist") - override def setCurrentDatabase(dbName: String): Unit = { - // we assume `dbName` will not include the catalog name. e.g. if you call - // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current - // catalog. - sparkSession.execute { builder => - builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName) - } - } - - /** - * Returns a list of databases (namespaces) available within the current catalog. - * - * @since 3.5.0 - */ - override def listDatabases(): Dataset[Database] = { - sparkSession.newDataset(Catalog.databaseEncoder) { builder => - builder.getCatalogBuilder.getListDatabasesBuilder - } - } - - /** - * Returns a list of databases (namespaces) which name match the specify pattern and available - * within the current catalog. - * - * @since 3.5.0 - */ - override def listDatabases(pattern: String): Dataset[Database] = { - sparkSession.newDataset(Catalog.databaseEncoder) { builder => - builder.getCatalogBuilder.getListDatabasesBuilder.setPattern(pattern) - } - } - - /** - * Returns a list of tables/views in the current database (namespace). This includes all - * temporary views. - * - * @since 3.5.0 - */ - override def listTables(): Dataset[Table] = { - listTables(currentDatabase) - } - - /** - * Returns a list of tables/views in the specified database (namespace) (the name can be - * qualified with catalog). This includes all temporary views. - * - * @since 3.5.0 - */ - @throws[AnalysisException]("database does not exist") - override def listTables(dbName: String): Dataset[Table] = { - sparkSession.newDataset(Catalog.tableEncoder) { builder => - builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName) - } - } - - /** - * Returns a list of tables/views in the specified database (namespace) which name match the - * specify pattern (the name can be qualified with catalog). This includes all temporary views. - * - * @since 3.5.0 - */ - @throws[AnalysisException]("database does not exist") - def listTables(dbName: String, pattern: String): Dataset[Table] = { - sparkSession.newDataset(Catalog.tableEncoder) { builder => - builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName).setPattern(pattern) - } - } - - /** - * Returns a list of functions registered in the current database (namespace). This includes all - * temporary functions. - * - * @since 3.5.0 - */ - override def listFunctions(): Dataset[Function] = { - listFunctions(currentDatabase) - } - - /** - * Returns a list of functions registered in the specified database (namespace) (the name can be - * qualified with catalog). This includes all built-in and temporary functions. - * - * @since 3.5.0 - */ - @throws[AnalysisException]("database does not exist") - override def listFunctions(dbName: String): Dataset[Function] = { - sparkSession.newDataset(Catalog.functionEncoder) { builder => - builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName) - } - } - - /** - * Returns a list of functions registered in the specified database (namespace) which name match - * the specify pattern (the name can be qualified with catalog). This includes all built-in and - * temporary functions. - * - * @since 3.5.0 - */ - @throws[AnalysisException]("database does not exist") - def listFunctions(dbName: String, pattern: String): Dataset[Function] = { - sparkSession.newDataset(Catalog.functionEncoder) { builder => - builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName).setPattern(pattern) - } - } - - /** - * Returns a list of columns for the given table/view or temporary view. - * - * @param tableName - * is either a qualified or unqualified name that designates a table/view. It follows the same - * resolution rule with SQL: search for temp views first then table/views in the current - * database (namespace). - * @since 3.5.0 - */ - @throws[AnalysisException]("database or table does not exist") - override def listColumns(tableName: String): Dataset[Column] = { - sparkSession.newDataset(Catalog.columnEncoder) { builder => - builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName) - } - } - - /** - * Returns a list of columns for the given table/view in the specified database under the Hive - * Metastore. - * - * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with - * qualified table/view name instead. - * - * @param dbName - * is an unqualified name that designates a database. - * @param tableName - * is an unqualified name that designates a table/view. - * @since 3.5.0 - */ - @throws[AnalysisException]("database does not exist") - override def listColumns(dbName: String, tableName: String): Dataset[Column] = { - sparkSession.newDataset(Catalog.columnEncoder) { builder => - builder.getCatalogBuilder.getListColumnsBuilder - .setTableName(tableName) - .setDbName(dbName) - } - } - - /** - * Get the database (namespace) with the specified name (can be qualified with catalog). This - * throws an AnalysisException when the database (namespace) cannot be found. - * - * @since 3.5.0 - */ - override def getDatabase(dbName: String): Database = { - sparkSession - .newDataset(Catalog.databaseEncoder) { builder => - builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName) - } - .head() - } - - /** - * Get the table or view with the specified name. This table can be a temporary view or a - * table/view. This throws an AnalysisException when no Table can be found. - * - * @param tableName - * is either a qualified or unqualified name that designates a table/view. It follows the same - * resolution rule with SQL: search for temp views first then table/views in the current - * database (namespace). - * @since 3.5.0 - */ - override def getTable(tableName: String): Table = { - sparkSession - .newDataset(Catalog.tableEncoder) { builder => - builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName) - } - .head() - } - - /** - * Get the table or view with the specified name in the specified database under the Hive - * Metastore. This throws an AnalysisException when no Table can be found. - * - * To get table/view in other catalogs, please use `getTable(tableName)` with qualified - * table/view name instead. - * - * @since 3.5.0 - */ - override def getTable(dbName: String, tableName: String): Table = { - sparkSession - .newDataset(Catalog.tableEncoder) { builder => - builder.getCatalogBuilder.getGetTableBuilder - .setTableName(tableName) - .setDbName(dbName) - } - .head() - } - - /** - * Get the function with the specified name. This function can be a temporary function or a - * function. This throws an AnalysisException when the function cannot be found. - * - * @param functionName - * is either a qualified or unqualified name that designates a function. It follows the same - * resolution rule with SQL: search for built-in/temp functions first then functions in the - * current database (namespace). - * @since 3.5.0 - */ - override def getFunction(functionName: String): Function = { - sparkSession - .newDataset(Catalog.functionEncoder) { builder => - builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName) - } - .head() - } - - /** - * Get the function with the specified name in the specified database under the Hive Metastore. - * This throws an AnalysisException when the function cannot be found. - * - * To get functions in other catalogs, please use `getFunction(functionName)` with qualified - * function name instead. - * - * @param dbName - * is an unqualified name that designates a database. - * @param functionName - * is an unqualified name that designates a function in the specified database - * @since 3.5.0 - */ - override def getFunction(dbName: String, functionName: String): Function = { - sparkSession - .newDataset(Catalog.functionEncoder) { builder => - builder.getCatalogBuilder.getGetFunctionBuilder - .setFunctionName(functionName) - .setDbName(dbName) - } - .head() - } - - /** - * Check if the database (namespace) with the specified name exists (the name can be qualified - * with catalog). - * - * @since 3.5.0 - */ - override def databaseExists(dbName: String): Boolean = { - sparkSession - .newDataset(PrimitiveBooleanEncoder) { builder => - builder.getCatalogBuilder.getDatabaseExistsBuilder.setDbName(dbName) - } - .head() - } - - /** - * Check if the table or view with the specified name exists. This can either be a temporary - * view or a table/view. - * - * @param tableName - * is either a qualified or unqualified name that designates a table/view. It follows the same - * resolution rule with SQL: search for temp views first then table/views in the current - * database (namespace). - * @since 3.5.0 - */ - override def tableExists(tableName: String): Boolean = { - sparkSession - .newDataset(PrimitiveBooleanEncoder) { builder => - builder.getCatalogBuilder.getTableExistsBuilder.setTableName(tableName) - } - .head() - } - - /** - * Check if the table or view with the specified name exists in the specified database under the - * Hive Metastore. - * - * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with - * qualified table/view name instead. - * - * @param dbName - * is an unqualified name that designates a database. - * @param tableName - * is an unqualified name that designates a table. - * @since 3.5.0 - */ - override def tableExists(dbName: String, tableName: String): Boolean = { - sparkSession - .newDataset(PrimitiveBooleanEncoder) { builder => - builder.getCatalogBuilder.getTableExistsBuilder - .setTableName(tableName) - .setDbName(dbName) - } - .head() - } - - /** - * Check if the function with the specified name exists. This can either be a temporary function - * or a function. - * - * @param functionName - * is either a qualified or unqualified name that designates a function. It follows the same - * resolution rule with SQL: search for built-in/temp functions first then functions in the - * current database (namespace). - * @since 3.5.0 - */ - override def functionExists(functionName: String): Boolean = { - sparkSession - .newDataset(PrimitiveBooleanEncoder) { builder => - builder.getCatalogBuilder.getFunctionExistsBuilder.setFunctionName(functionName) - } - .head() - } - - /** - * Check if the function with the specified name exists in the specified database under the Hive - * Metastore. - * - * To check existence of functions in other catalogs, please use `functionExists(functionName)` - * with qualified function name instead. - * - * @param dbName - * is an unqualified name that designates a database. - * @param functionName - * is an unqualified name that designates a function. - * @since 3.5.0 - */ - override def functionExists(dbName: String, functionName: String): Boolean = { - sparkSession - .newDataset(PrimitiveBooleanEncoder) { builder => - builder.getCatalogBuilder.getFunctionExistsBuilder - .setFunctionName(functionName) - .setDbName(dbName) - } - .head() - } - - /** - * Creates a table from the given path and returns the corresponding DataFrame. It will use the - * default data source configured by spark.sql.sources.default. - * - * @param tableName - * is either a qualified or unqualified name that designates a table. If no database - * identifier is provided, it refers to a table in the current database. - * @since 3.5.0 - */ - override def createTable(tableName: String, path: String): DataFrame = { - sparkSession.newDataFrame { builder => - builder.getCatalogBuilder.getCreateTableBuilder - .setTableName(tableName) - .setSchema(DataTypeProtoConverter.toConnectProtoType(new StructType)) - .setDescription("") - .putOptions("path", path) - } - } - - /** - * Creates a table from the given path based on a data source and returns the corresponding - * DataFrame. - * - * @param tableName - * is either a qualified or unqualified name that designates a table. If no database - * identifier is provided, it refers to a table in the current database. - * @since 3.5.0 - */ - override def createTable(tableName: String, path: String, source: String): DataFrame = { - createTable(tableName, source, Map("path" -> path)) - } - - /** - * (Scala-specific) Creates a table based on the dataset in a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @param tableName - * is either a qualified or unqualified name that designates a table. If no database - * identifier is provided, it refers to a table in the current database. - * @since 3.5.0 - */ - override def createTable( - tableName: String, - source: String, - options: Map[String, String]): DataFrame = { - createTable(tableName, source, new StructType, options) - } - - /** - * (Scala-specific) Creates a table based on the dataset in a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @param tableName - * is either a qualified or unqualified name that designates a table. If no database - * identifier is provided, it refers to a table in the current database. - * @since 3.5.0 - */ - override def createTable( - tableName: String, - source: String, - description: String, - options: Map[String, String]): DataFrame = { - createTable(tableName, source, new StructType, description, options) - } - - /** - * (Scala-specific) Create a table based on the dataset in a data source, a schema and a set of - * options. Then, returns the corresponding DataFrame. - * - * @param tableName - * is either a qualified or unqualified name that designates a table. If no database - * identifier is provided, it refers to a table in the current database. - * @since 3.5.0 - */ - override def createTable( - tableName: String, - source: String, - schema: StructType, - options: Map[String, String]): DataFrame = { - createTable( - tableName = tableName, - source = source, - schema = schema, - description = "", - options = options) - } - - /** - * (Scala-specific) Create a table based on the dataset in a data source, a schema and a set of - * options. Then, returns the corresponding DataFrame. - * - * @param tableName - * is either a qualified or unqualified name that designates a table. If no database - * identifier is provided, it refers to a table in the current database. - * @since 3.5.0 - */ - override def createTable( - tableName: String, - source: String, - schema: StructType, - description: String, - options: Map[String, String]): DataFrame = { - sparkSession.newDataFrame { builder => - val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder - .setTableName(tableName) - .setSource(source) - .setSchema(DataTypeProtoConverter.toConnectProtoType(schema)) - .setDescription(description) - options.foreach { case (k, v) => - createTableBuilder.putOptions(k, v) - } - } - } - - /** - * Drops the local temporary view with the given view name in the catalog. If the view has been - * cached before, then it will also be uncached. - * - * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that - * created it, i.e. it will be automatically dropped when the session terminates. It's not tied - * to any databases, i.e. we can't use `db1.view1` to reference a local temporary view. - * - * Note that, the return type of this method was Unit in Spark 2.0, but changed to Boolean in - * Spark 2.1. - * - * @param viewName - * the name of the temporary view to be dropped. - * @return - * true if the view is dropped successfully, false otherwise. - * @since 3.5.0 - */ - override def dropTempView(viewName: String): Boolean = { - sparkSession - .newDataset(PrimitiveBooleanEncoder) { builder => - builder.getCatalogBuilder.getDropTempViewBuilder.setViewName(viewName) - } - .head() - } - - /** - * Drops the global temporary view with the given view name in the catalog. If the view has been - * cached before, then it will also be uncached. - * - * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark - * application, i.e. it will be automatically dropped when the application terminates. It's tied - * to a system preserved database `global_temp`, and we must use the qualified name to refer a - * global temp view, e.g. `SELECT * FROM global_temp.view1`. - * - * @param viewName - * the unqualified name of the temporary view to be dropped. - * @return - * true if the view is dropped successfully, false otherwise. - * @since 3.5.0 - */ - override def dropGlobalTempView(viewName: String): Boolean = { - sparkSession - .newDataset(PrimitiveBooleanEncoder) { builder => - builder.getCatalogBuilder.getDropGlobalTempViewBuilder.setViewName(viewName) - } - .head() - } - - /** - * Recovers all the partitions in the directory of a table and update the catalog. Only works - * with a partitioned table, and not a view. - * - * @param tableName - * is either a qualified or unqualified name that designates a table. If no database - * identifier is provided, it refers to a table in the current database. - * @since 3.5.0 - */ - override def recoverPartitions(tableName: String): Unit = { - sparkSession.execute { builder => - builder.getCatalogBuilder.getRecoverPartitionsBuilder.setTableName(tableName) - } - } - - /** - * Returns true if the table is currently cached in-memory. - * - * @param tableName - * is either a qualified or unqualified name that designates a table/view. If no database - * identifier is provided, it refers to a temporary view or a table/view in the current - * database. - * @since 3.5.0 - */ - override def isCached(tableName: String): Boolean = { - sparkSession - .newDataset(PrimitiveBooleanEncoder) { builder => - builder.getCatalogBuilder.getIsCachedBuilder.setTableName(tableName) - } - .head() - } - - /** - * Caches the specified table in-memory. - * - * @param tableName - * is either a qualified or unqualified name that designates a table/view. If no database - * identifier is provided, it refers to a temporary view or a table/view in the current - * database. - * @since 3.5.0 - */ - override def cacheTable(tableName: String): Unit = { - sparkSession.execute { builder => - builder.getCatalogBuilder.getCacheTableBuilder.setTableName(tableName) - } - } - - /** - * Caches the specified table or view with the given storage level. - * - * @group cachemgmt - * @since 3.4.0 - */ - override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit = { - sparkSession.execute { builder => - builder.getCatalogBuilder.getCacheTableBuilder - .setTableName(tableName) - .setStorageLevel(StorageLevelProtoConverter.toConnectProtoType(storageLevel)) - } - } - - /** - * Removes the specified table from the in-memory cache. - * - * @param tableName - * is either a qualified or unqualified name that designates a table/view. If no database - * identifier is provided, it refers to a temporary view or a table/view in the current - * database. - * @since 3.5.0 - */ - override def uncacheTable(tableName: String): Unit = { - sparkSession.execute { builder => - builder.getCatalogBuilder.getUncacheTableBuilder.setTableName(tableName) - } - } - - /** - * Removes all cached tables from the in-memory cache. - * - * @since 3.5.0 - */ - override def clearCache(): Unit = { - sparkSession.execute { builder => - builder.getCatalogBuilder.getClearCacheBuilder - } - } - - /** - * Invalidates and refreshes all the cached data and metadata of the given table. For - * performance reasons, Spark SQL or the external data source library it uses might cache - * certain metadata about a table, such as the location of blocks. When those change outside of - * Spark SQL, users should call this function to invalidate the cache. - * - * If this table is cached as an InMemoryRelation, drop the original cached version and make the - * new version cached lazily. - * - * @param tableName - * is either a qualified or unqualified name that designates a table/view. If no database - * identifier is provided, it refers to a temporary view or a table/view in the current - * database. - * @since 3.5.0 - */ - override def refreshTable(tableName: String): Unit = { - sparkSession.execute { builder => - builder.getCatalogBuilder.getRefreshTableBuilder.setTableName(tableName) - } - } - - /** - * Invalidates and refreshes all the cached data (and the associated metadata) for any `Dataset` - * that contains the given data source path. Path matching is by prefix, i.e. "/" would - * invalidate everything that is cached. - * - * @since 3.5.0 - */ - override def refreshByPath(path: String): Unit = { - sparkSession.execute { builder => - builder.getCatalogBuilder.getRefreshByPathBuilder.setPath(path) - } - } - - /** - * Returns the current catalog in this session. - * - * @since 3.5.0 - */ - override def currentCatalog(): String = sparkSession - .newDataset(StringEncoder) { builder => - builder.getCatalogBuilder.getCurrentCatalogBuilder - } - .head() - - /** - * Sets the current catalog in this session. - * - * @since 3.5.0 - */ - override def setCurrentCatalog(catalogName: String): Unit = - sparkSession.execute { builder => - builder.getCatalogBuilder.getSetCurrentCatalogBuilder.setCatalogName(catalogName) - } - - /** - * Returns a list of catalogs available in this session. - * - * @since 3.5.0 - */ - override def listCatalogs(): Dataset[CatalogMetadata] = - sparkSession - .newDataset(Catalog.catalogEncoder) { builder => - builder.getCatalogBuilder.getListCatalogsBuilder - } - - /** - * Returns a list of catalogs which name match the specify pattern and available in this - * session. - * - * @since 3.5.0 - */ - override def listCatalogs(pattern: String): Dataset[CatalogMetadata] = - sparkSession - .newDataset(Catalog.catalogEncoder) { builder => - builder.getCatalogBuilder.getListCatalogsBuilder.setPattern(pattern) - } -} - -private object Catalog { - private val databaseEncoder: AgnosticEncoder[Database] = ScalaReflection - .encoderFor(ScalaReflection.localTypeOf[Database]) - .asInstanceOf[AgnosticEncoder[Database]] - - private val catalogEncoder: AgnosticEncoder[CatalogMetadata] = ScalaReflection - .encoderFor(ScalaReflection.localTypeOf[CatalogMetadata]) - .asInstanceOf[AgnosticEncoder[CatalogMetadata]] - - private val tableEncoder: AgnosticEncoder[Table] = ScalaReflection - .encoderFor(ScalaReflection.localTypeOf[Table]) - .asInstanceOf[AgnosticEncoder[Table]] - - private val functionEncoder: AgnosticEncoder[Function] = ScalaReflection - .encoderFor(ScalaReflection.localTypeOf[Function]) - .asInstanceOf[AgnosticEncoder[Function]] - - private val columnEncoder: AgnosticEncoder[Column] = ScalaReflection - .encoderFor(ScalaReflection.localTypeOf[Column]) - .asInstanceOf[AgnosticEncoder[Column]] -} diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala index dbb2630a52e02..4a3d15a73be0c 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala @@ -352,7 +352,37 @@ class SparkSession private[sql] ( lazy val streams: StreamingQueryManager = new StreamingQueryManager(this) /** @inheritdoc */ - lazy val catalog: Catalog = new Catalog(this) + override def parseMultipartIdentifier(identifier: String): Seq[String] = { + // Minimal parsing: split by '.' and remove backticks (Connect does not have session parser). + identifier.split("\\.").map { part => + val t = part.trim + if (t.startsWith("`") && t.endsWith("`") && t.length > 1) { + t.drop(1).dropRight(1).replace("``", "`") + } else { + t + } + }.toSeq + } + + /** @inheritdoc */ + override def quoteIdentifier(identifier: String): String = { + if (identifier.contains("`")) "`" + identifier.replace("`", "``") + "`" + else "`" + identifier + "`" + } + + /** @inheritdoc */ + override def currentDatabase: String = { + sql("SELECT current_database()").collect()(0).getString(0) + } + + /** @inheritdoc */ + override def currentCatalog(): String = { + sql("SELECT current_catalog()").collect()(0).getString(0) + } + + /** @inheritdoc */ + override lazy val catalog: org.apache.spark.sql.catalog.Catalog = + new org.apache.spark.sql.catalog.CatalogImpl(this, None) /** @inheritdoc */ def table(tableName: String): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala index dea86604a9710..9f33d7b9c1838 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala @@ -18,936 +18,20 @@ package org.apache.spark.sql.classic import scala.reflect.runtime.universe.TypeTag -import scala.util.control.NonFatal -import org.apache.spark.SparkThrowable -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog -import org.apache.spark.sql.catalog.{CatalogMetadata, Column, Database, Function, Table} import org.apache.spark.sql.catalyst.DefinedByConstructorParams -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} -import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{ColumnDefinition, CreateTable, LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, ShowTables, UnresolvedTableSpec, View} +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog.{CatalogManager, SupportsNamespaces, TableCatalog} -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper} -import org.apache.spark.sql.connector.catalog.CatalogV2Util.v2ColumnsToStructType -import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.command.{ShowNamespacesCommand, ShowTablesCommand} -import org.apache.spark.sql.execution.command.CommandUtils -import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.internal.connector.V1Function -import org.apache.spark.sql.types.StructType -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.ArrayImplicits._ - /** - * Internal implementation of the user-facing `Catalog`. + * Classic implementation of the user-facing Catalog. Delegates to the unified + * [[org.apache.spark.sql.catalog.CatalogImpl]] with a [[ClassicCatalogHelper]] for 0-job paths, + * and adds Classic-specific methods used by tests (makeDataset). */ -class Catalog(sparkSession: SparkSession) extends catalog.Catalog { - - private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog - - /** - * Helper function for parsing identifiers. - * @param fallbackOnException if true, when parsing fails, return the original name. - */ - private def parseIdent(name: String, fallbackOnException: Boolean = false): Seq[String] = { - try { - sparkSession.sessionState.sqlParser.parseMultipartIdentifier(name) - } catch { - case _: ParseException if fallbackOnException => Seq(name) - } - } - - private def qualifyV1Ident(nameParts: Seq[String]): Seq[String] = { - assert(nameParts.length == 1 || nameParts.length == 2) - if (nameParts.length == 1) { - Seq(CatalogManager.SESSION_CATALOG_NAME, sessionCatalog.getCurrentDatabase) ++ nameParts - } else { - CatalogManager.SESSION_CATALOG_NAME +: nameParts - } - } - - /** - * Returns the current default database in this session. - */ - override def currentDatabase: String = - sparkSession.sessionState.catalogManager.currentNamespace.quoted - - /** - * Sets the current default database in this session. - */ - @throws[AnalysisException]("database does not exist") - override def setCurrentDatabase(dbName: String): Unit = { - // we assume `dbName` will not include the catalog name. e.g. if you call - // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current - // catalog. - sparkSession.sessionState.catalogManager.setCurrentNamespace(parseIdent(dbName).toArray) - } - - /** - * Returns a list of databases available across all sessions. - */ - override def listDatabases(): Dataset[Database] = listDatabasesInternal(None) - - /** - * Returns a list of databases (namespaces) which name match the specify pattern and - * available within the current catalog. - * - * @since 3.5.0 - */ - override def listDatabases(pattern: String): Dataset[Database] = - listDatabasesInternal(Some(pattern)) - - private def listDatabasesInternal(patternOpt: Option[String]): Dataset[Database] = { - val plan = ShowNamespacesCommand(UnresolvedNamespace(Nil), patternOpt) - val qe = sparkSession.sessionState.executePlan(plan) - val catalog = qe.analyzed.collectFirst { - case ShowNamespacesCommand(r: ResolvedNamespace, _, _) => r.catalog - }.get - val databases = qe.toRdd.collect().map { row => - // dbName can either be a quoted identifier (single or multi part) or an unquoted single part - val dbName = row.getString(0) - makeDatabase(Some(catalog.name()), dbName) - } - Catalog.makeDataset(databases.toImmutableArraySeq, sparkSession) - } - - /** - * Returns a list of tables in the current database. - * This includes all temporary tables. - */ - override def listTables(): Dataset[Table] = { - listTables(currentDatabase) - } - - /** - * Returns a list of tables in the specified database. - * This includes all temporary tables. - */ - @throws[AnalysisException]("database does not exist") - override def listTables(dbName: String): Dataset[Table] = { - listTablesInternal(dbName, None) - } - - /** - * Returns a list of tables/views in the specified database (namespace) - * which name match the specify pattern (the name can be qualified with catalog). - * This includes all temporary views. - * - * @since 3.5.0 - */ - @throws[AnalysisException]("database does not exist") - override def listTables(dbName: String, pattern: String): Dataset[Table] = { - listTablesInternal(dbName, Some(pattern)) - } - - private def listTablesInternal(dbName: String, pattern: Option[String]): Dataset[Table] = { - val namespace = resolveNamespace(dbName) - val plan = ShowTables(UnresolvedNamespace(namespace), pattern) - makeTablesDataset(plan) - } - - private def makeTablesDataset(plan: ShowTables): Dataset[Table] = { - val qe = sparkSession.sessionState.executePlan(plan) - val catalog = qe.analyzed.collectFirst { - case ShowTables(r: ResolvedNamespace, _, _) => r.catalog - case _: ShowTablesCommand => - sparkSession.sessionState.catalogManager.v2SessionCatalog - }.get - val tables = qe.toRdd.collect().flatMap { row => resolveTable(row, catalog.name()) } - Catalog.makeDataset(tables.toImmutableArraySeq, sparkSession) - } - - private[sql] def resolveTable(row: InternalRow, catalogName: String): Option[Table] = { - val tableName = row.getString(1) - val namespaceName = row.getString(0) - val isTempView = row.getBoolean(2) - val ns = if (isTempView) { - if (namespaceName.isEmpty) Nil else Seq(namespaceName) - } else { - parseIdent(namespaceName) - } - val nameParts = if (isTempView) { - // Temp views do not belong to any catalog. We shouldn't prepend the catalog name here. - ns :+ tableName - } else { - catalogName +: ns :+ tableName - } - try { - Some(makeTable(nameParts)) - } catch { - case e: SparkThrowable with Throwable => - Catalog.ListTable.ERROR_HANDLING_RULES.get(e.getCondition) match { - case Some(Catalog.ListTable.Skip) => None - case Some(Catalog.ListTable.ReturnPartialResults) if !isTempView => - Some(new Table( - name = tableName, - catalog = catalogName, - namespace = ns.toArray, - description = null, - tableType = null, - isTemporary = false - )) - case _ => throw e - } - } - } - - private def tableExists(nameParts: Seq[String]): Boolean = { - val plan = UnresolvedTableOrView(nameParts, "Catalog.tableExists", true) - try { - sparkSession.sessionState.executePlan(plan).analyzed match { - case _: ResolvedTable => true - case _: ResolvedPersistentView => true - case _: ResolvedTempView => true - case _ => false - } - } catch { - case e: AnalysisException if e.getCondition == "TABLE_OR_VIEW_NOT_FOUND" => false - } - } - - private def makeTable(nameParts: Seq[String]): Table = { - val plan = UnresolvedTableOrView(nameParts, "Catalog.makeTable", true) - sparkSession.sessionState.executePlan(plan).analyzed match { - case ResolvedTable(catalog, ident, table, _) => - val isExternal = table.properties().getOrDefault( - TableCatalog.PROP_EXTERNAL, "false").equals("true") - new Table( - name = ident.name(), - catalog = catalog.name(), - namespace = ident.namespace(), - description = table.properties().get("comment"), - tableType = - if (isExternal) CatalogTableType.EXTERNAL.name - else CatalogTableType.MANAGED.name, - isTemporary = false) - case ResolvedPersistentView(catalog, identifier, metadata) => - new Table( - name = identifier.name(), - catalog = catalog.name(), - namespace = identifier.namespace(), - description = metadata.comment.orNull, - tableType = "VIEW", - isTemporary = false - ) - case ResolvedTempView(identifier, _) => - new Table( - name = identifier.name(), - catalog = null, - namespace = identifier.namespace(), - description = null, - tableType = "TEMPORARY", - isTemporary = true - ) - case _ => throw QueryCompilationErrors.tableOrViewNotFound(nameParts) - } - } - - /** - * Returns a list of functions registered in the current database. - * This includes all temporary functions - */ - override def listFunctions(): Dataset[Function] = { - listFunctions(currentDatabase) - } - - /** - * Returns a list of functions registered in the specified database. - * This includes all temporary functions - */ - @throws[AnalysisException]("database does not exist") - override def listFunctions(dbName: String): Dataset[Function] = { - listFunctionsInternal(dbName, None) - } - - /** - * Returns a list of functions registered in the specified database (namespace) - * which name match the specify pattern (the name can be qualified with catalog). - * This includes all built-in and temporary functions. - * - * @since 3.5.0 - */ - @throws[AnalysisException]("database does not exist") - def listFunctions(dbName: String, pattern: String): Dataset[Function] = { - listFunctionsInternal(dbName, Some(pattern)) - } - - private def listFunctionsInternal(dbName: String, pattern: Option[String]): Dataset[Function] = { - val namespace = resolveNamespace(dbName) - val functions = collection.mutable.ArrayBuilder.make[Function] - - // TODO: The SHOW FUNCTIONS should tell us the function type (built-in, temp, persistent) and - // we can simply the code below quite a bit. For now we need to list built-in functions - // separately as several built-in function names are not parsable, such as `!=`. - - // List built-in functions. We don't need to specify the namespace here as SHOW FUNCTIONS with - // only system scope does not need to know the catalog and namespace. - val plan0 = ShowFunctions(CurrentNamespace, false, true, pattern) - sparkSession.sessionState.executePlan(plan0).toRdd.collect().foreach { row => - // Built-in functions do not belong to any catalog or namespace. We can only look it up with - // a single part name. - val name = row.getString(0) - functions += makeFunction(Seq(name)) - } - - // List user functions. - val plan1 = ShowFunctions(UnresolvedNamespace(namespace), true, false, pattern) - sparkSession.sessionState.executePlan(plan1).toRdd.collect().foreach { row => - functions += makeFunction(parseIdent(row.getString(0))) - } - - Catalog.makeDataset(functions.result().toImmutableArraySeq, sparkSession) - } - - private def toFunctionIdent(functionName: String): Seq[String] = { - val parsed = parseIdent(functionName) - // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in - // the Hive Metastore first. - if (parsed.length <= 2 && - !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) && - sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) { - qualifyV1Ident(parsed) - } else { - parsed - } - } - - private def functionExists(ident: Seq[String]): Boolean = { - val plan = UnresolvedFunctionName(ident, Catalog.FUNCTION_EXISTS_COMMAND_NAME) - try { - sparkSession.sessionState.executePlan(plan).analyzed match { - case _: ResolvedPersistentFunc => true - case _: ResolvedNonPersistentFunc => true - case _ => false - } - } catch { - case e: AnalysisException if e.getCondition == "UNRESOLVED_ROUTINE" => false - } - } - - private def makeFunction(ident: Seq[String]): Function = { - val plan = UnresolvedFunctionName(ident, "Catalog.makeFunction") - sparkSession.sessionState.executePlan(plan).analyzed match { - case f: ResolvedPersistentFunc => - val className = f.func match { - case f: V1Function => f.info.getClassName - case f => f.getClass.getName - } - new Function( - name = f.identifier.name(), - catalog = f.catalog.name(), - namespace = f.identifier.namespace(), - description = f.func.description(), - className = className, - isTemporary = false) - - case f: ResolvedNonPersistentFunc => - val className = f.func match { - case f: V1Function => f.info.getClassName - case f => f.getClass.getName - } - new Function( - name = f.name, - catalog = null, - namespace = null, - description = f.func.description(), - className = className, - isTemporary = true) - - case _ => - val catalogPath = (currentCatalog() +: - sparkSession.sessionState.catalogManager.currentNamespace).mkString(".") - throw QueryCompilationErrors.unresolvedRoutineError(ident, Seq(catalogPath), plan.origin) - } - } - - /** - * Returns a list of columns for the given table/view or temporary view. - */ - @throws[AnalysisException]("table does not exist") - override def listColumns(tableName: String): Dataset[Column] = { - val parsed = parseIdent(tableName) - // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in - // the Hive Metastore first. - val nameParts = if (parsed.length <= 2 && !sessionCatalog.isTempView(parsed) && - sessionCatalog.tableExists(parsed.asTableIdentifier)) { - qualifyV1Ident(parsed) - } else { - parsed - } - listColumns(nameParts) - } - - /** - * Returns a list of columns for the given table/view or temporary view in the specified database. - */ - @throws[AnalysisException]("database or table does not exist") - override def listColumns(dbName: String, tableName: String): Dataset[Column] = { - // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the - // Hive Metastore. - listColumns(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName)) - } - - private def listColumns(ident: Seq[String]): Dataset[Column] = { - val plan = UnresolvedTableOrView(ident, "Catalog.listColumns", true) - - val columns = sparkSession.sessionState.executePlan(plan).analyzed match { - case ResolvedTable(_, _, table, _) => - val (partitionColumnNames, bucketSpecOpt, clusterBySpecOpt) = - table.partitioning.toImmutableArraySeq.convertTransforms - val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil) - val clusteringColumnNames = clusterBySpecOpt.map { clusterBySpec => - clusterBySpec.columnNames.map(_.toString) - }.getOrElse(Nil).toSet - val schema = v2ColumnsToStructType(table.columns()) - schemaToColumns(schema, partitionColumnNames.contains, bucketColumnNames.contains, - clusteringColumnNames.contains) - - case ResolvedPersistentView(_, _, metadata) => - schemaToColumns(metadata.schema) - - case ResolvedTempView(_, metadata) => - schemaToColumns(metadata.schema) - - case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident) - } - - Catalog.makeDataset(columns, sparkSession) - } - - private def schemaToColumns( - schema: StructType, - isPartCol: String => Boolean = _ => false, - isBucketCol: String => Boolean = _ => false, - isClusteringCol: String => Boolean = _ => false): Seq[Column] = { - schema.map { field => - new Column( - name = field.name, - description = field.getComment().orNull, - dataType = field.dataType.simpleString, - nullable = field.nullable, - isPartition = isPartCol(field.name), - isBucket = isBucketCol(field.name), - isCluster = isClusteringCol(field.name)) - } - } - - /** - * Gets the database with the specified name. This throws an `AnalysisException` when no - * `Database` can be found. - */ - override def getDatabase(dbName: String): Database = { - makeDatabase(None, dbName) - } - - // when catalogName is specified, dbName should be a valid quoted multi-part identifier, or a - // valid unquoted single part identifier. - private def makeDatabase(catalogNameOpt: Option[String], dbName: String): Database = { - val idents = catalogNameOpt match { - case Some(catalogName) => catalogName +: parseIdent(dbName, fallbackOnException = true) - case None => resolveNamespace(dbName) - } - val plan = UnresolvedNamespace(idents, fetchMetadata = true) - sparkSession.sessionState.executePlan(plan).analyzed match { - case ResolvedNamespace(catalog, namespace, metadata) => - new Database( - name = namespace.quoted, - catalog = catalog.name, - description = metadata.get(SupportsNamespaces.PROP_COMMENT).orNull, - locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION).orNull - ) - case _ => new Database(name = dbName, description = null, locationUri = null) - } - } - - private def resolveNamespace(dbName: String): Seq[String] = { - // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or - // a qualified namespace with catalog name. We assume it's a single database name - // and check if we can find it in the sessionCatalog. If so we list functions under - // that database. Otherwise we will resolve the catalog/namespace and list functions there. - if (sessionCatalog.databaseExists(dbName)) { - Seq(CatalogManager.SESSION_CATALOG_NAME, dbName) - } else { - parseIdent(dbName) - } - } - - private def toTableIdent(tableName: String): Seq[String] = { - val parsed = parseIdent(tableName) - // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in - // the Hive Metastore first. - if (parsed.length <= 2 && !sessionCatalog.isTempView(parsed) && - sessionCatalog.tableExists(parsed.asTableIdentifier)) { - qualifyV1Ident(parsed) - } else { - parsed - } - } - - /** - * Gets the table or view with the specified name. This table can be a temporary view or a - * table/view. This throws an `AnalysisException` when no `Table` can be found. - */ - override def getTable(tableName: String): Table = { - makeTable(toTableIdent(tableName)) - } - - /** - * Gets the table or view with the specified name in the specified database. This throws an - * `AnalysisException` when no `Table` can be found. - */ - override def getTable(dbName: String, tableName: String): Table = { - if (sessionCatalog.isGlobalTempViewDB(dbName)) { - makeTable(Seq(dbName, tableName)) - } else { - // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the - // Hive Metastore. - makeTable(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName)) - } - } - - /** - * Gets the function with the specified name. This function can be a temporary function or a - * function. This throws an `AnalysisException` when no `Function` can be found. - */ - override def getFunction(functionName: String): Function = { - makeFunction(toFunctionIdent(functionName)) - } - - /** - * Gets the function with the specified name. This returns `None` when no `Function` can be - * found. - */ - override def getFunction(dbName: String, functionName: String): Function = { - // For backward compatibility (Spark 3.3 and prior), here we always look up the function from - // the Hive Metastore. - makeFunction(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, functionName)) - } - - /** - * Checks if the database with the specified name exists. - */ - override def databaseExists(dbName: String): Boolean = { - try { - getDatabase(dbName) - true - } catch { - case _: NoSuchNamespaceException => false - } - } - - /** - * Checks if the table or view with the specified name exists. This can either be a temporary - * view or a table/view. - */ - override def tableExists(tableName: String): Boolean = { - tableExists(toTableIdent(tableName)) - } - - /** - * Checks if the table or view with the specified name exists in the specified database. - */ - override def tableExists(dbName: String, tableName: String): Boolean = { - if (sessionCatalog.isGlobalTempViewDB(dbName)) { - tableExists(Seq(dbName, tableName)) - } else { - // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the - // Hive Metastore. - tableExists(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName)) - } - } - - /** - * Checks if the function with the specified name exists. This can either be a temporary function - * or a function. - */ - override def functionExists(functionName: String): Boolean = { - functionExists(toFunctionIdent(functionName)) - } - - /** - * Checks if the function with the specified name exists in the specified database. - */ - override def functionExists(dbName: String, functionName: String): Boolean = { - // For backward compatibility (Spark 3.3 and prior), here we always look up the function from - // the Hive Metastore. - functionExists(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, functionName)) - } - - /** - * Creates a table from the given path and returns the corresponding DataFrame. - * It will use the default data source configured by spark.sql.sources.default. - * - * @group ddl_ops - * @since 2.2.0 - */ - override def createTable(tableName: String, path: String): DataFrame = { - val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName - createTable(tableName, path, dataSourceName) - } - - /** - * Creates a table from the given path and returns the corresponding - * DataFrame. - * - * @group ddl_ops - * @since 2.2.0 - */ - override def createTable(tableName: String, path: String, source: String): DataFrame = { - createTable(tableName, source, Map("path" -> path)) - } - - /** - * (Scala-specific) - * Creates a table based on the dataset in a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.2.0 - */ - override def createTable( - tableName: String, - source: String, - options: Map[String, String]): DataFrame = { - createTable(tableName, source, new StructType, options) - } - - /** - * (Scala-specific) - * Creates a table based on the dataset in a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 3.1.0 - */ - override def createTable( - tableName: String, - source: String, - description: String, - options: Map[String, String]): DataFrame = { - createTable(tableName, source, new StructType, description, options) - } - - /** - * (Scala-specific) - * Creates a table based on the dataset in a data source, a schema and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.2.0 - */ - override def createTable( - tableName: String, - source: String, - schema: StructType, - options: Map[String, String]): DataFrame = { - createTable( - tableName = tableName, - source = source, - schema = schema, - description = "", - options = options - ) - } - - /** - * (Scala-specific) - * Creates a table based on the dataset in a data source, a schema and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 3.1.0 - */ - override def createTable( - tableName: String, - source: String, - schema: StructType, - description: String, - options: Map[String, String]): DataFrame = { - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - val storage = DataSource.buildStorageFormatFromOptions(options) - val tableType = if (storage.locationUri.isDefined) { - CatalogTableType.EXTERNAL - } else { - CatalogTableType.MANAGED - } - - // The location in UnresolvedTableSpec should be the original user-provided path string. - val location = CaseInsensitiveMap(options).get("path") - - val newOptions = OptionList(options.map { case (key, value) => - (key, Literal(value).asInstanceOf[Expression]) - }.toSeq) - val tableSpec = UnresolvedTableSpec( - properties = Map(), - provider = Some(source), - optionExpression = newOptions, - location = location, - comment = { if (description.isEmpty) None else Some(description) }, - collation = None, - serde = None, - external = tableType == CatalogTableType.EXTERNAL, - constraints = Seq.empty) - - val plan = CreateTable( - name = UnresolvedIdentifier(ident), - columns = schema.map(ColumnDefinition.fromV1Column(_, sparkSession.sessionState.sqlParser)), - partitioning = Seq(), - tableSpec = tableSpec, - ignoreIfExists = false) - - sparkSession.sessionState.executePlan(plan).toRdd - sparkSession.table(tableName) - } - - /** - * Drops the local temporary view with the given view name in the catalog. - * If the view has been cached/persisted before, it's also unpersisted. - * - * @param viewName the identifier of the temporary view to be dropped. - * @group ddl_ops - * @since 2.0.0 - */ - override def dropTempView(viewName: String): Boolean = { - sparkSession.sessionState.catalog.getTempView(viewName).exists { viewDef => - uncacheView(viewDef) - sessionCatalog.dropTempView(viewName) - } - } - - /** - * Drops the global temporary view with the given view name in the catalog. - * If the view has been cached/persisted before, it's also unpersisted. - * - * @param viewName the identifier of the global temporary view to be dropped. - * @group ddl_ops - * @since 2.1.0 - */ - override def dropGlobalTempView(viewName: String): Boolean = { - sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef => - uncacheView(viewDef) - sessionCatalog.dropGlobalTempView(viewName) - } - } - - private def uncacheView(viewDef: View): Unit = { - try { - // If view text is defined, it means we are not storing analyzed logical plan for the view - // and instead its behavior follows that of a permanent view (see SPARK-33142 for more - // details). Therefore, when uncaching the view we should also do in a cascade fashion, the - // same way as how a permanent view is handled. This also avoids a potential issue where a - // dependent view becomes invalid because of the above while its data is still cached. - val viewText = viewDef.desc.viewText - val df = Dataset.ofRows(sparkSession, viewDef) - sparkSession.sharedState.cacheManager.uncacheQuery(df, cascade = viewText.isDefined) - } catch { - case NonFatal(_) => // ignore - } - } - - /** - * Recovers all the partitions in the directory of a table and update the catalog. - * Only works with a partitioned table, and not a temporary view. - * - * @param tableName is either a qualified or unqualified name that designates a table. - * If no database identifier is provided, it refers to a table in the - * current database. - * @group ddl_ops - * @since 2.1.1 - */ - override def recoverPartitions(tableName: String): Unit = { - val multiPartIdent = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - sparkSession.sessionState.executePlan( - RecoverPartitions( - UnresolvedTable(multiPartIdent, "recoverPartitions()"))).toRdd - } - - /** - * Returns true if the table or view is currently cached in-memory. - * - * @group cachemgmt - * @since 2.0.0 - */ - override def isCached(tableName: String): Boolean = { - sparkSession.sharedState.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty - } - - /** - * Persist the specified table or view with the default storage level, - * - * @group cachemgmt - * @since 2.0.0 - */ - override def cacheTable(tableName: String): Unit = { - cacheTable(tableName, sparkSession.sessionState.conf.defaultCacheStorageLevel) - } - - /** - * Caches the specified table or view with the given storage level. - * - * @group cachemgmt - * @since 2.3.0 - */ - override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit = { - sparkSession.sharedState.cacheManager.cacheQuery( - sparkSession.table(tableName), Some(tableName), storageLevel) - } - - /** - * Removes the specified table or view from the in-memory cache. - * - * @group cachemgmt - * @since 2.0.0 - */ - override def uncacheTable(tableName: String): Unit = { - // parse the table name and check if it's a temp view (must have 1-2 name parts) - // temp views are uncached using uncacheView which respects view text semantics (SPARK-33142) - // use CommandUtils for all tables (including with 3+ part names) - val nameParts = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - sessionCatalog.getLocalOrGlobalTempView(nameParts).map(uncacheView).getOrElse { - val relation = resolveRelation(tableName) - CommandUtils.uncacheTableOrView(sparkSession, relation, cascade = true) - } - } - - /** - * Removes all cached tables or views from the in-memory cache. - * - * @group cachemgmt - * @since 2.0.0 - */ - override def clearCache(): Unit = { - sparkSession.sharedState.cacheManager.clearCache() - } - - /** - * Returns true if the [[Dataset]] is currently cached in-memory. - * - * @group cachemgmt - * @since 2.0.0 - */ - protected[sql] def isCached(qName: Dataset[_]): Boolean = { - sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty - } - - /** - * The method fully refreshes a table or view with the given name including: - * 1. The relation cache in the session catalog. The method removes table entry from the cache. - * 2. The file indexes of all relations used by the given view. - * 3. Table/View schema in the Hive Metastore if the SQL config - * `spark.sql.hive.caseSensitiveInferenceMode` is set to `INFER_AND_SAVE`. - * 4. Cached data of the given table or view, and all its dependents that refer to it. - * Existing cached data will be cleared and the cache will be lazily filled when - * the next time the table/view or the dependents are accessed. - * - * The method does not do: - * - schema inference for file source tables - * - statistics update - * - * The method is supposed to be used in all cases when need to refresh table/view data - * and meta-data. - * - * @group cachemgmt - * @since 2.0.0 - */ - override def refreshTable(tableName: String): Unit = { - val relation = resolveRelation(tableName) - - relation.refresh() - - // Temporary and global temporary views are not supposed to be put into the relation cache - // since they are tracked separately. V1 and V2 plans are cache invalidated accordingly. - def invalidateCache(plan: LogicalPlan): Unit = plan match { - case v: View => - if (!v.isTempView) sessionCatalog.invalidateCachedTable(v.desc.identifier) - case r: LogicalRelation => - sessionCatalog.invalidateCachedTable(r.catalogTable.get.identifier) - case h: HiveTableRelation => - sessionCatalog.invalidateCachedTable(h.tableMeta.identifier) - case r: DataSourceV2Relation => - r.catalog.get.asTableCatalog.invalidateTable(r.identifier.get) - case _ => plan.children.foreach(invalidateCache) - } - invalidateCache(relation) - - // Re-caches the logical plan of the relation. - // Note this is a no-op for the relation itself if it's not cached, but will clear all - // caches referencing this relation. If this relation is cached as an InMemoryRelation, - // this will clear the relation cache and caches of all its dependents. - CommandUtils.recacheTableOrView(sparkSession, relation) - } - - private def resolveRelation(tableName: String): LogicalPlan = { - sparkSession.table(tableName).queryExecution.analyzed - } - - /** - * Refreshes the cache entry and the associated metadata for all Dataset (if any), that contain - * the given data source path. Path matching is by prefix, i.e. "/" would invalidate - * everything that is cached. - * - * @group cachemgmt - * @since 2.0.0 - */ - override def refreshByPath(resourcePath: String): Unit = { - sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, resourcePath) - } - - /** - * Returns the current default catalog in this session. - * - * @since 3.4.0 - */ - override def currentCatalog(): String = { - sparkSession.sessionState.catalogManager.currentCatalog.name() - } - - /** - * Sets the current default catalog in this session. - * - * @since 3.4.0 - */ - override def setCurrentCatalog(catalogName: String): Unit = { - sparkSession.sessionState.catalogManager.setCurrentCatalog(catalogName) - } - - /** - * Returns a list of catalogs in this session. - * - * @since 3.4.0 - */ - override def listCatalogs(): Dataset[CatalogMetadata] = { - val catalogs = sparkSession.sessionState.catalogManager.listCatalogs(None) - Catalog.makeDataset(catalogs.map(name => makeCatalog(name)), sparkSession) - } - - /** - * Returns a list of catalogs which name match the specify pattern and available in this session. - * - * @since 3.5.0 - */ - override def listCatalogs(pattern: String): Dataset[CatalogMetadata] = { - val catalogs = sparkSession.sessionState.catalogManager.listCatalogs(Some(pattern)) - Catalog.makeDataset(catalogs.map(name => makeCatalog(name)), sparkSession) - } - - private def makeCatalog(name: String): CatalogMetadata = { - new CatalogMetadata( - name = name, - description = null) - } -} - +class Catalog(sparkSession: SparkSession) extends org.apache.spark.sql.catalog.CatalogImpl( + sparkSession, + Some(new ClassicCatalogHelper(sparkSession))) private[sql] object Catalog { @@ -961,20 +45,4 @@ private[sql] object Catalog { val queryExecution = sparkSession.sessionState.executePlan(plan) new Dataset[T](queryExecution, enc) } - - private val FUNCTION_EXISTS_COMMAND_NAME = "Catalog.functionExists" - - private object ListTable { - - sealed trait ErrorHandlingAction - - case object Skip extends ErrorHandlingAction - - case object ReturnPartialResults extends ErrorHandlingAction - - val ERROR_HANDLING_RULES: Map[String, ErrorHandlingAction] = Map( - "UNSUPPORTED_FEATURE.HIVE_TABLE_TYPE" -> ReturnPartialResults, - "TABLE_OR_VIEW_NOT_FOUND" -> Skip - ) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/ClassicCatalogHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/ClassicCatalogHelper.scala new file mode 100644 index 0000000000000..4c659f0951767 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/ClassicCatalogHelper.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.classic + +import org.apache.spark.sql.catalog.{CatalogHelper, Column, Function, Table} +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.execution.datasources.v2.CatalogDDLHelpers + +/** + * Catalog helper for Classic SparkSession that uses CatalogDDLHelpers (and session catalog) + * to implement 0-job paths for tableExists, getTable, getFunction, listColumns, functionExists, + * and isCached. + */ +class ClassicCatalogHelper(session: SparkSession) extends CatalogHelper { + + private def sessionCatalog = session.sessionState.catalog + private def qualifyV1Ident(nameParts: Seq[String]): Seq[String] = { + assert(nameParts.length == 1 || nameParts.length == 2) + if (nameParts.length == 1) { + Seq(CatalogManager.SESSION_CATALOG_NAME, sessionCatalog.getCurrentDatabase) ++ nameParts + } else { + CatalogManager.SESSION_CATALOG_NAME +: nameParts + } + } + + private def toTableIdent(nameParts: Seq[String]): Seq[String] = { + if (nameParts.isEmpty) return nameParts + if (nameParts.length == 1 && sessionCatalog.isTempView(nameParts)) return nameParts + if (nameParts.length == 2 && sessionCatalog.isGlobalTempViewDB(nameParts.head)) return nameParts + if (nameParts.length == 1 && sessionCatalog.tableExists(nameParts.asTableIdentifier)) { + return qualifyV1Ident(nameParts) + } + if (nameParts.length == 2 && sessionCatalog.tableExists( + org.apache.spark.sql.catalyst.TableIdentifier(nameParts(1), Some(nameParts(0))))) { + return qualifyV1Ident(nameParts) + } + nameParts + } + + private def toFunctionIdent(nameParts: Seq[String]): Seq[String] = { + if (nameParts.length <= 2 && + !sessionCatalog.isTemporaryFunction(nameParts.asFunctionIdentifier) && + sessionCatalog.isPersistentFunction(nameParts.asFunctionIdentifier)) { + qualifyV1Ident(nameParts) + } else { + nameParts + } + } + + override def tableExists(nameParts: Seq[String]): Boolean = + CatalogDDLHelpers.tableExists(session, toTableIdent(nameParts)) + + override def getTableMetadata(nameParts: Seq[String]): Table = + CatalogDDLHelpers.getTableMetadata(session, toTableIdent(nameParts)) + + override def getFunctionMetadata(nameParts: Seq[String]): Function = + CatalogDDLHelpers.getFunctionMetadata(session, toFunctionIdent(nameParts)) + + override def listColumns(nameParts: Seq[String]): Seq[Column] = + CatalogDDLHelpers.listColumns(session, toTableIdent(nameParts)) + + override def functionExists(nameParts: Seq[String]): Boolean = + CatalogDDLHelpers.functionExists(session, toFunctionIdent(nameParts)) + + override def isCached(nameParts: Seq[String]): Boolean = { + val qualified = toTableIdent(nameParts) + val cacheManager = session.sharedState.cacheManager + val cachedList = CatalogDDLHelpers.listCachedTableNameParts(cacheManager) + val resolver = session.sessionState.conf.resolver + cachedList.exists { cached => + cached.length == qualified.length && + cached.zip(qualified).forall { case (a, b) => resolver(a, b) } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala index d1c7c406544c9..4fff6b83a1d0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CompoundBody, LocalRelation, import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.classic.SparkSession.applyAndLoadExtensions +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.errors.{QueryCompilationErrors, SqlScriptingErrors} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ExternalCommandExecutor @@ -443,7 +444,24 @@ class SparkSession private( * ------------------------- */ /** @inheritdoc */ - @transient lazy val catalog: Catalog = new Catalog(self) + override def parseMultipartIdentifier(identifier: String): Seq[String] = + sessionState.sqlParser.parseMultipartIdentifier(identifier) + + /** @inheritdoc */ + override def quoteIdentifier(identifier: String): String = + org.apache.spark.sql.catalyst.util.QuotingUtils.quoteIfNeeded(identifier) + + /** @inheritdoc */ + override def currentDatabase: String = + sessionState.catalogManager.currentNamespace.quoted + + /** @inheritdoc */ + override def currentCatalog(): String = + sessionState.catalogManager.currentCatalog.name() + + /** @inheritdoc */ + @transient lazy val catalog: org.apache.spark.sql.catalog.Catalog = + new Catalog(self) /** @inheritdoc */ def table(tableName: String): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 3f92f24156d3c..b7090bf46367a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -463,6 +463,28 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } } + /** + * Returns the qualified name parts of all cached tables/views. + * Used by SHOW CACHED TABLES DDL. + */ + private[sql] def listCachedTableNameParts(): Seq[Seq[String]] = { + cachedData.flatMap { cd => + EliminateSubqueryAliases(cd.plan) match { + case LogicalRelationWithTable(_, Some(catalogTable)) => + Some(catalogTable.identifier.nameParts) + case DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _, timeTravelSpec) + if timeTravelSpec.isEmpty => + Some(v2Ident.toQualifiedNameParts(catalog)) + case v: View => + Some(v.desc.identifier.nameParts) + case HiveTableRelation(catalogTable, _, _, _, _) => + Some(catalogTable.identifier.nameParts) + case _ => + None + } + }.distinct + } + /** * Optionally returns cached data for the given [[Dataset]] */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 75403e14c6083..20fa6d3b8d0ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -402,6 +402,13 @@ class SparkSqlAstBuilder extends AstBuilder { ShowCurrentNamespaceCommand() } + /** + * Create a [[ShowCachedTablesCommand]] logical command. + */ + override def visitShowCachedTables(ctx: ShowCachedTablesContext): LogicalPlan = withOrigin(ctx) { + ShowCachedTablesCommand() + } + /** * Create a [[SetNamespaceCommand]] logical command. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ShowCachedTablesCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ShowCachedTablesCommand.scala new file mode 100644 index 0000000000000..58c472c98eb4c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ShowCachedTablesCommand.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType + +/** + * The command for `SHOW CACHED TABLES`. + * Returns the qualified names of all cached tables/views. + */ +case class ShowCachedTablesCommand() extends LeafRunnableCommand { + override val output: Seq[Attribute] = Seq( + AttributeReference("tableName", StringType, nullable = false)()) + + override def run(sparkSession: SparkSession): Seq[Row] = { + val namePartsList = sparkSession.sharedState.cacheManager.listCachedTableNameParts() + namePartsList.map { parts => Row(parts.mkString(".")) } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 9c3ac9ef74191..04c3df0f32b35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.{Row, SparkSession} case object ClearCacheCommand extends LeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.catalog.clearCache() + // Call cacheManager directly to avoid recursion: CatalogImpl.clearCache() runs CLEAR CACHE + sparkSession.sharedState.cacheManager.clearCache() Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ac74228caae86..558de6a1fc978 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -1431,7 +1431,16 @@ case class RefreshTableCommand(tableIdent: TableIdentifier) override def run(sparkSession: SparkSession): Seq[Row] = { // Refresh the given table's metadata. If this table is cached as an InMemoryRelation, // drop the original cached version and make the new version cached lazily. - sparkSession.catalog.refreshTable(tableIdent.quotedString) + // Use the session catalog and cache manager directly to avoid recursion through + // the user-facing catalog (CatalogImpl.refreshTable -> sql REFRESH TABLE -> this command). + val sessionCatalog = sparkSession.sessionState.catalog + sessionCatalog.refreshTable(tableIdent) + val qualifiedIdent = sessionCatalog.qualifyIdentifier(tableIdent) + val nameParts = Seq( + qualifiedIdent.catalog.get, + qualifiedIdent.database.get, + qualifiedIdent.table) + sparkSession.sharedState.cacheManager.recacheTableOrView(sparkSession, nameParts) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogDDLHelpers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogDDLHelpers.scala new file mode 100644 index 0000000000000..128956afc887d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogDDLHelpers.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalog.{Column => CatalogColumn, Function => CatalogFunction, Table => CatalogTable} +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.classic.SparkSession +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, TableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, NamespaceHelper, TransformHelper} +import org.apache.spark.sql.connector.catalog.CatalogV2Util +import org.apache.spark.sql.connector.catalog.CatalogV2Util.v2ColumnsToStructType +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CacheManager +import org.apache.spark.sql.internal.connector.V1Function +import org.apache.spark.sql.types.StructType + +/** + * Shared helpers that implement the same logic as DDL commands (SHOW TABLES, DESCRIBE TABLE, + * SHOW FUNCTIONS, DESCRIBE FUNCTION, SHOW CACHED TABLES, etc.). Used by both the physical + * DDL exec nodes and the Catalog API so that the Catalog "calls the DDL implementation" + * with the same performance (0 jobs where DDL uses resolution/catalog APIs only). + */ +object CatalogDDLHelpers { + + /** + * Resolve namespace parts to (catalog, namespace). Same resolution as SHOW TABLES IN ns. + * Use current catalog/namespace when namespaceParts is empty. + */ + def resolveNamespace( + session: SparkSession, + namespaceParts: Seq[String]): (CatalogPlugin, Seq[String]) = { + if (namespaceParts.isEmpty) { + val catalogManager = session.sessionState.catalogManager + (catalogManager.currentCatalog, catalogManager.currentNamespace.toSeq) + } else { + val plan = UnresolvedNamespace(namespaceParts) + session.sessionState.executePlan(plan).analyzed match { + case ResolvedNamespace(catalog, ns, _) => (catalog, ns) + case _ => throw QueryCompilationErrors.noSuchNamespaceError(namespaceParts.toArray) + } + } + } + + /** + * Produces the same rows as SHOW TABLES [IN ns] [LIKE pattern]. Used by ShowTablesExec + * and by Catalog.tableExists (via listing and membership check). + */ + def listTables( + session: SparkSession, + catalog: TableCatalog, + namespace: Seq[String], + pattern: Option[String]): Seq[(String, String, Boolean)] = { + val tables = catalog.listTables(namespace.toArray).toSeq + tables.flatMap { ident => + val matchesPattern = pattern.map(p => + StringUtils.filterPattern(Seq(ident.name()), p).nonEmpty).getOrElse(true) + if (matchesPattern) { + val isTemp = if (CatalogV2Util.isSessionCatalog(catalog)) { + session.sessionState.catalog.isTempView((ident.namespace() :+ ident.name()).toSeq) + } else { + false + } + Some((ident.namespace().quoted, ident.name(), isTemp)) + } else { + None + } + } + } + + /** + * Same semantics as SHOW TABLES + membership: true if the table exists in the resolved namespace. + * Used by Catalog.tableExists (0 jobs, uses DDL listTables implementation). + * When a specific namespace is requested (nsParts.nonEmpty), only persistent tables in that + * namespace count; temp views with the same name are excluded (tableExists(db, "x") is false + * if only a temp view "x" exists). + */ + def tableExists(session: SparkSession, nameParts: Seq[String]): Boolean = { + if (nameParts.isEmpty) return false + val nsParts = nameParts.init + val tablePart = nameParts.last + try { + val (catalogPlugin, ns) = resolveNamespace(session, nsParts) + val tableCatalog = catalogPlugin.asTableCatalog + val rows = listTables(session, tableCatalog, ns, None) + val resolver = session.sessionState.conf.resolver + val excludeTempWhenNamespaceSpecified = nsParts.nonEmpty + rows.exists { case (_, tname, isTemp) => + resolver(tname, tablePart) && (!excludeTempWhenNamespaceSpecified || !isTemp) + } + } catch { + case _: org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException => false + } + } + + /** + * Same result as resolving a table and building catalog.Table (getTable). + * Used by Catalog.getTable. + */ + def getTableMetadata(session: SparkSession, nameParts: Seq[String]): CatalogTable = { + val plan = UnresolvedTableOrView(nameParts, "Catalog.getTable", true) + session.sessionState.executePlan(plan).analyzed match { + case ResolvedTable(cat, ident, table, _) => + val isExternal = table.properties().getOrDefault( + TableCatalog.PROP_EXTERNAL, "false").equals("true") || + table.properties().containsKey("path") || + table.properties().containsKey(TableCatalog.PROP_LOCATION) + new CatalogTable( + name = ident.name(), + catalog = cat.name(), + namespace = ident.namespace(), + description = table.properties().get("comment"), + tableType = + if (isExternal) CatalogTableType.EXTERNAL.name + else CatalogTableType.MANAGED.name, + isTemporary = false) + case ResolvedPersistentView(cat, identifier, metadata) => + new CatalogTable( + name = identifier.name(), + catalog = cat.name(), + namespace = identifier.namespace(), + description = metadata.comment.orNull, + tableType = "VIEW", + isTemporary = false) + case ResolvedTempView(identifier, _) => + new CatalogTable( + name = identifier.name(), + catalog = null, + namespace = identifier.namespace(), + description = null, + tableType = "TEMPORARY", + isTemporary = true) + case _ => throw QueryCompilationErrors.tableOrViewNotFound(nameParts) + } + } + + /** + * Same semantics as resolving a function name: true if the function exists. Used by + * Catalog.functionExists (0 jobs, uses same resolution as DESCRIBE FUNCTION). + */ + def functionExists(session: SparkSession, nameParts: Seq[String]): Boolean = { + val plan = UnresolvedFunctionName(nameParts, CatalogDDLHelpers.FUNCTION_EXISTS_COMMAND_NAME) + try { + session.sessionState.executePlan(plan).analyzed match { + case _: ResolvedPersistentFunc => true + case _: ResolvedNonPersistentFunc => true + case _ => false + } + } catch { + case e: AnalysisException if e.getCondition == "UNRESOLVED_ROUTINE" => false + } + } + + /** Command name used for resolution in functionExists (for error messages). */ + private[v2] val FUNCTION_EXISTS_COMMAND_NAME = "Catalog.functionExists" + + /** + * Same result as resolving a function and building catalog.Function (getFunction). + */ + def getFunctionMetadata(session: SparkSession, nameParts: Seq[String]): CatalogFunction = { + val plan = UnresolvedFunctionName(nameParts, "Catalog.getFunction") + session.sessionState.executePlan(plan).analyzed match { + case f: ResolvedPersistentFunc => + val className = f.func match { + case vf: V1Function => vf.info.getClassName + case other => other.getClass.getName + } + new CatalogFunction( + name = f.identifier.name(), + catalog = f.catalog.name(), + namespace = f.identifier.namespace(), + description = f.func.description(), + className = className, + isTemporary = false) + case f: ResolvedNonPersistentFunc => + val className = f.func match { + case vf: V1Function => vf.info.getClassName + case other => other.getClass.getName + } + new CatalogFunction( + name = f.name, + catalog = null, + namespace = null, + description = f.func.description(), + className = className, + isTemporary = true) + case _ => + val cm = session.sessionState.catalogManager + val catalogPath = (cm.currentCatalog.name() +: cm.currentNamespace).mkString(".") + throw QueryCompilationErrors.unresolvedRoutineError( + nameParts, Seq(catalogPath), plan.origin) + } + } + + /** + * Same result as resolving a table and listing columns (listColumns). + * Used by Catalog.listColumns. + */ + def listColumns(session: SparkSession, nameParts: Seq[String]): Seq[CatalogColumn] = { + val plan = UnresolvedTableOrView(nameParts, "Catalog.listColumns", true) + val columns = session.sessionState.executePlan(plan).analyzed match { + case ResolvedTable(_, _, table, _) => + val (partitionColumnNames, bucketSpecOpt, clusterBySpecOpt) = + table.partitioning().toSeq.convertTransforms + val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil) + val clusteringColumnNames = clusterBySpecOpt.map { clusterBySpec => + clusterBySpec.columnNames.map(_.toString) + }.getOrElse(Nil).toSet + val schema = v2ColumnsToStructType(table.columns()) + schemaToColumns(schema, partitionColumnNames.contains, bucketColumnNames.contains, + clusteringColumnNames.contains) + case ResolvedPersistentView(_, _, metadata) => + schemaToColumns(metadata.schema) + case ResolvedTempView(_, metadata) => + schemaToColumns(metadata.schema) + case _ => throw QueryCompilationErrors.tableOrViewNotFound(nameParts) + } + columns + } + + private def schemaToColumns( + schema: StructType, + isPartCol: String => Boolean = _ => false, + isBucketCol: String => Boolean = _ => false, + isClusteringCol: String => Boolean = _ => false): Seq[CatalogColumn] = { + schema.map { field => + new CatalogColumn( + name = field.name, + description = field.getComment().orNull, + dataType = field.dataType.simpleString, + nullable = field.nullable, + isPartition = isPartCol(field.name), + isBucket = isBucketCol(field.name), + isCluster = isClusteringCol(field.name)) + } + } + + /** + * Same source as SHOW CACHED TABLES: returns qualified name parts of all cached tables. + * Used by Catalog.isCached (membership check) so the API uses the DDL implementation. + */ + def listCachedTableNameParts( + cacheManager: CacheManager): Seq[Seq[String]] = { + cacheManager.listCachedTableNameParts() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala index 8ccd126b4b22c..17ce31aa67d05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala @@ -17,13 +17,9 @@ package org.apache.spark.sql.execution.datasources.v2 -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.util.StringUtils -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.LeafExecNode /** @@ -35,21 +31,9 @@ case class ShowTablesExec( namespace: Seq[String], pattern: Option[String]) extends V2CommandExec with LeafExecNode { override protected def run(): Seq[InternalRow] = { - val rows = new ArrayBuffer[InternalRow]() - - val tables = catalog.listTables(namespace.toArray) - tables.map { table => - if (pattern.map(StringUtils.filterPattern(Seq(table.name()), _).nonEmpty).getOrElse(true)) { - rows += toCatalystRow(table.namespace().quoted, table.name(), isTempView(table, catalog)) - } + val rows = CatalogDDLHelpers.listTables(session, catalog, namespace, pattern) + rows.map { case (nsQuoted, tableName, isTemp) => + toCatalystRow(nsQuoted, tableName, isTemp) } - - rows.toSeq - } - - private def isTempView(ident: Identifier, catalog: TableCatalog): Boolean = { - if (CatalogV2Util.isSessionCatalog(catalog)) { - session.sessionState.catalog.isTempView((ident.namespace() :+ ident.name()).toSeq) - } else false } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 94e60db67ac75..91a604944feb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -791,6 +791,10 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { assertEqual("CLEAR CACHE", ClearCacheCommand) } + test("SHOW CACHED TABLES") { + assertEqual("SHOW CACHED TABLES", ShowCachedTablesCommand()) + } + test("CREATE TABLE LIKE COMMAND should reject reserved properties") { val sql1 = s"CREATE TABLE target LIKE source TBLPROPERTIES (${TableCatalog.PROP_OWNER}='howdy')" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index ebfffc14b0144..4898cff80cafc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.classic.Catalog import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableSessionCatalog} @@ -37,7 +36,6 @@ import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -832,11 +830,16 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf spark.sessionState.catalogManager.catalog("testcat").asTableCatalog val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName)) assert(table.columns sameElements CatalogV2Util.structTypeToV2Columns(tableSchema)) - assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName)) - assert(table.properties().get("comment").equals(description)) - assert(table.properties().get("path").equals(dir.getAbsolutePath)) - assert(table.properties().get("external").equals("true")) - assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath)) + assert(table.properties().get("provider") === classOf[FakeV2Provider].getName) + assert(table.properties().get("comment") === description) + // Path may be stored as raw path or as file URI; compare path component + val expectedPathUri = CatalogUtils.stringToURI(dir.getAbsolutePath) + val actualPathUri = CatalogUtils.stringToURI(table.properties().get("path")) + assert(actualPathUri.getPath === expectedPathUri.getPath) + assert(table.properties().get("external") === "true") + // Location may be stored as path or "file:/path"; compare path component for robustness + val actualLocationUri = CatalogUtils.stringToURI(table.properties().get("location")) + assert(actualLocationUri.getPath === expectedPathUri.getPath) } } @@ -1003,11 +1006,18 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(spark.table(tableName).collect().length == 1) spark.catalog.refreshTable(tableName) - assert(spark.table(tableName).collect().length == 0) + // After refresh, cache is invalidated; reading will hit the (deleted) path. + // With ignoreMissingFiles, missing files are skipped and we get 0 rows. + spark.conf.set("spark.sql.files.ignoreMissingFiles", "true") + try { + assert(spark.table(tableName).collect().length == 0) + } finally { + spark.conf.unset("spark.sql.files.ignoreMissingFiles") + } } } - test("qualified name with catalogy - get database") { + test("qualified name with catalog - get database") { val catalogsAndDatabases = Seq(("testcat", "somedb"), ("testcat", "ns.somedb"), ("spark_catalog", "somedb")) catalogsAndDatabases.foreach { case (catalog, dbName) => @@ -1127,15 +1137,6 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf func2.className.startsWith("org.apache.spark.sql.internal.CatalogSuite")) } - test("SPARK-46145: listTables does not throw exception when the table or view is not found") { - val impl = spark.catalog.asInstanceOf[Catalog] - for ((isTemp, dbName) <- Seq((true, ""), (false, "non_existing_db"))) { - val row = new GenericInternalRow( - Array(UTF8String.fromString(dbName), UTF8String.fromString("non_existing_table"), isTemp)) - impl.resolveTable(row, CatalogManager.SESSION_CATALOG_NAME) - } - } - private def getConstructorParameterValues(obj: DefinedByConstructorParams): Seq[AnyRef] = { ScalaReflection.getConstructorParameterNames(obj.getClass).map { name => obj.getClass.getMethod(name).invoke(obj)