Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public class SparkCatalog extends SparkBaseCatalog

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
checkRequiredConfigurations();
SparkSession sparkSession = PaimonSparkSession$.MODULE$.active();
checkRequiredConfigurations(sparkSession);
this.catalogName = name;
CatalogContext catalogContext =
CatalogContext.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ case class PaimonSparkWriter(
writeWithoutBucket(input)

case HASH_FIXED =>
if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {
if (paimonExtensionEnabled(sparkSession) && BucketFunction.supportsTable(table)) {
// Topology: input -> shuffle by partition & bucket
val bucketNumber = coreOptions.bucket()
val bucketKeyCol = tableSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.paimon.spark.{SparkCatalogOptions, SparkConnectorOptions}
import org.apache.paimon.table.Table

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.internal.StaticSQLConf

Expand All @@ -36,12 +37,14 @@ object OptionUtils extends SQLConfHelper with Logging {

private val PAIMON_OPTION_PREFIX = "spark.paimon."
private val SPARK_CATALOG_PREFIX = "spark.sql.catalog."
private val PAIMON_SPARK_SESSION_EXTENSIONS =
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"

def paimonExtensionEnabled: Boolean = {
conf
def paimonExtensionEnabled(sparkSession: SparkSession): Boolean = {
sparkSession.sessionState.conf
.getConf(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
.getOrElse(Seq.empty)
.contains("org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
.contains(PAIMON_SPARK_SESSION_EXTENSIONS)
}

def getOptionString(option: ConfigOption[_]): String = {
Expand All @@ -63,15 +66,22 @@ object OptionUtils extends SQLConfHelper with Logging {
}
}

def checkRequiredConfigurations(): Unit = {
if (getOptionString(SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED).toBoolean) {
if (!paimonExtensionEnabled) {
throw new RuntimeException(
"""
|When using Paimon, it is necessary to configure `spark.sql.extensions` and ensure that it includes `org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`.
|You can disable this check by configuring `spark.paimon.requiredSparkConfsCheck.enabled` to `false`, but it is strongly discouraged to do so.
|""".stripMargin)
}
private def requiredSparkConfsCheckEnabled(sparkSession: SparkSession): Boolean = {
sparkSession.sessionState.conf
.getConfString(
s"$PAIMON_OPTION_PREFIX${SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED.key()}",
SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED.defaultValue().toString
)
.toBoolean
}

def checkRequiredConfigurations(sparkSession: SparkSession): Unit = {
if (requiredSparkConfsCheckEnabled(sparkSession) && !paimonExtensionEnabled(sparkSession)) {
throw new RuntimeException(
"""
|When using Paimon, it is necessary to configure `spark.sql.extensions` and ensure that it includes `org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`.
|You can disable this check by configuring `spark.paimon.requiredSparkConfsCheck.enabled` to `false`, but it is strongly discouraged to do so.
|""".stripMargin)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.table.FileStoreTableFactory

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.paimon.Utils
import org.junit.jupiter.api.Assertions

Expand Down Expand Up @@ -265,4 +267,41 @@ class PaimonConfigCheckTest extends SparkFunSuite {
}
}
}

test("Paimon Option: required confs check with temporary SQLConf") {
val spark = SparkSession
.builder()
.master("local[2]")
.config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
.config("spark.sql.catalog.paimon.warehouse", Utils.createTempDir.getCanonicalPath)
.config(
"spark.sql.extensions",
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
.config("spark.paimon.requiredSparkConfsCheck.enabled", "true")
.getOrCreate()
try {
SQLConf.withExistingConf(new SQLConf) {
OptionUtils.checkRequiredConfigurations(spark)
}
} finally {
spark.close()
}
}

test("Paimon Option: required confs switch with temporary SQLConf") {
val spark = SparkSession
.builder()
.master("local[2]")
.config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
.config("spark.sql.catalog.paimon.warehouse", Utils.createTempDir.getCanonicalPath)
.config("spark.paimon.requiredSparkConfsCheck.enabled", "false")
.getOrCreate()
try {
SQLConf.withExistingConf(new SQLConf) {
OptionUtils.checkRequiredConfigurations(spark)
}
} finally {
spark.close()
}
}
}