diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 9d12201c91a2..e8a4bea03cc7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -122,8 +122,8 @@ public class SparkCatalog extends SparkBaseCatalog @Override public void initialize(String name, CaseInsensitiveStringMap options) { - checkRequiredConfigurations(); SparkSession sparkSession = PaimonSparkSession$.MODULE$.active(); + checkRequiredConfigurations(sparkSession); this.catalogName = name; CatalogContext catalogContext = CatalogContext.create( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index b4ba7efc0960..fe3d807f57f4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -314,7 +314,7 @@ case class PaimonSparkWriter( writeWithoutBucket(input) case HASH_FIXED => - if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) { + if (paimonExtensionEnabled(sparkSession) && BucketFunction.supportsTable(table)) { // Topology: input -> shuffle by partition & bucket val bucketNumber = coreOptions.bucket() val bucketKeyCol = tableSchema diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala index 2e6014ff1a41..7a6fa547c2a9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala @@ -24,6 +24,7 @@ import org.apache.paimon.spark.{SparkCatalogOptions, SparkConnectorOptions} import org.apache.paimon.table.Table import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.internal.StaticSQLConf @@ -36,12 +37,14 @@ object OptionUtils extends SQLConfHelper with Logging { private val PAIMON_OPTION_PREFIX = "spark.paimon." private val SPARK_CATALOG_PREFIX = "spark.sql.catalog." + private val PAIMON_SPARK_SESSION_EXTENSIONS = + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions" - def paimonExtensionEnabled: Boolean = { - conf + def paimonExtensionEnabled(sparkSession: SparkSession): Boolean = { + sparkSession.sessionState.conf .getConf(StaticSQLConf.SPARK_SESSION_EXTENSIONS) .getOrElse(Seq.empty) - .contains("org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .contains(PAIMON_SPARK_SESSION_EXTENSIONS) } def getOptionString(option: ConfigOption[_]): String = { @@ -63,15 +66,22 @@ object OptionUtils extends SQLConfHelper with Logging { } } - def checkRequiredConfigurations(): Unit = { - if (getOptionString(SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED).toBoolean) { - if (!paimonExtensionEnabled) { - throw new RuntimeException( - """ - |When using Paimon, it is necessary to configure `spark.sql.extensions` and ensure that it includes `org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`. - |You can disable this check by configuring `spark.paimon.requiredSparkConfsCheck.enabled` to `false`, but it is strongly discouraged to do so. - |""".stripMargin) - } + private def requiredSparkConfsCheckEnabled(sparkSession: SparkSession): Boolean = { + sparkSession.sessionState.conf + .getConfString( + s"$PAIMON_OPTION_PREFIX${SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED.key()}", + SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED.defaultValue().toString + ) + .toBoolean + } + + def checkRequiredConfigurations(sparkSession: SparkSession): Unit = { + if (requiredSparkConfsCheckEnabled(sparkSession) && !paimonExtensionEnabled(sparkSession)) { + throw new RuntimeException( + """ + |When using Paimon, it is necessary to configure `spark.sql.extensions` and ensure that it includes `org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`. + |You can disable this check by configuring `spark.paimon.requiredSparkConfsCheck.enabled` to `false`, but it is strongly discouraged to do so. + |""".stripMargin) } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala index 14351103f425..f602be161c34 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala @@ -19,10 +19,12 @@ package org.apache.paimon.spark.sql import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.spark.util.OptionUtils import org.apache.paimon.table.FileStoreTableFactory import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.paimon.Utils import org.junit.jupiter.api.Assertions @@ -265,4 +267,41 @@ class PaimonConfigCheckTest extends SparkFunSuite { } } } + + test("Paimon Option: required confs check with temporary SQLConf") { + val spark = SparkSession + .builder() + .master("local[2]") + .config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog") + .config("spark.sql.catalog.paimon.warehouse", Utils.createTempDir.getCanonicalPath) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .config("spark.paimon.requiredSparkConfsCheck.enabled", "true") + .getOrCreate() + try { + SQLConf.withExistingConf(new SQLConf) { + OptionUtils.checkRequiredConfigurations(spark) + } + } finally { + spark.close() + } + } + + test("Paimon Option: required confs switch with temporary SQLConf") { + val spark = SparkSession + .builder() + .master("local[2]") + .config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog") + .config("spark.sql.catalog.paimon.warehouse", Utils.createTempDir.getCanonicalPath) + .config("spark.paimon.requiredSparkConfsCheck.enabled", "false") + .getOrCreate() + try { + SQLConf.withExistingConf(new SQLConf) { + OptionUtils.checkRequiredConfigurations(spark) + } + } finally { + spark.close() + } + } }