From d2839d34c2efd14bf984a4779b191f7afc93e345 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Mon, 1 Jun 2026 14:00:14 +0800 Subject: [PATCH 1/4] [AURON #2295] Replace hardcoded config strings with typed key() references --- .../spark/sql/auron/AuronConverters.scala | 52 +++++++++---------- .../auron/AuronSparkSessionExtension.scala | 2 +- .../iceberg/IcebergConvertProvider.scala | 2 +- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index deb3c4f3f..994b669c4 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -326,50 +326,50 @@ object AuronConverters extends Logging { val neverConvertReason = exec match { case _: FileSourceScanExec if !enableScan => - "Conversion disabled: spark.auron.enable.scan=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SCAN.key()}=false." case _: ProjectExec if !enableProject => - "Conversion disabled: spark.auron.enable.project=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_PROJECT.key()}=false." case _: FilterExec if !enableFilter => - "Conversion disabled: spark.auron.enable.filter=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_FILTER.key()}=false." case _: SortExec if !enableSort => - "Conversion disabled: spark.auron.enable.sort=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SORT.key()}=false." case _: UnionExec if !enableUnion => - "Conversion disabled: spark.auron.enable.union=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_UNION.key()}=false." case _: SortMergeJoinExec if !enableSmj => - "Conversion disabled: spark.auron.enable.smj=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SMJ.key()}=false." case _: ShuffledHashJoinExec if !enableShj => - "Conversion disabled: spark.auron.enable.shj=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SHJ.key()}=false." case _: BroadcastHashJoinExec if !enableBhj => - "Conversion disabled: spark.auron.enable.bhj=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_BHJ.key()}=false." case _: BroadcastNestedLoopJoinExec if !enableBnlj => - "Conversion disabled: spark.auron.enable.bnlj=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_BNLJ.key()}=false." case _: LocalLimitExec if !enableLocalLimit => - "Conversion disabled: spark.auron.enable.local.limit=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_LOCAL_LIMIT.key()}=false." case _: GlobalLimitExec if !enableGlobalLimit => - "Conversion disabled: spark.auron.enable.global.limit=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_GLOBAL_LIMIT.key()}=false." case _: TakeOrderedAndProjectExec if !enableTakeOrderedAndProject => - "Conversion disabled: spark.auron.enable.take.ordered.and.project=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT.key()}=false." case _: CollectLimitExec if !enableCollectLimit => - "Conversion disabled: spark.auron.enable.collectLimit=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_COLLECT_LIMIT.key()}=false." case _: HashAggregateExec if !enableAggr => - "Conversion disabled: spark.auron.enable.aggr=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_AGGR.key()}=false." case _: ObjectHashAggregateExec if !enableAggr => - "Conversion disabled: spark.auron.enable.aggr=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_AGGR.key()}=false." case _: SortAggregateExec if !enableAggr => - "Conversion disabled: spark.auron.enable.aggr=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_AGGR.key()}=false." case _: ExpandExec if !enableExpand => - "Conversion disabled: spark.auron.enable.expand=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_EXPAND.key()}=false." case _: WindowExec if !enableWindow => - "Conversion disabled: spark.auron.enable.window=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_WINDOW.key()}=false." case _: UnaryExecNode if exec.getClass.getSimpleName == "WindowGroupLimitExec" && !enableWindowGroupLimit => - "Conversion disabled: spark.auron.enable.window.group.limit=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT.key()}=false." case _: GenerateExec if !enableGenerate => - "Conversion disabled: spark.auron.enable.generate=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_GENERATE.key()}=false." case _: LocalTableScanExec if !enableLocalTableScan => - "Conversion disabled: spark.auron.enable.local.table.scan=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_LOCAL_TABLE_SCAN.key()}=false." case _: DataWritingCommandExec if !enableDataWriting => - "Conversion disabled: spark.auron.enable.data.writing=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_DATA_WRITING.key()}=false." case _ => s"${exec.getClass.getSimpleName} is not supported yet." } @@ -396,9 +396,9 @@ object AuronConverters extends Logging { exec match { case _: FileSourceScanExec if enableScan => if (!enableScanParquet) { - "Conversion disabled: spark.auron.enable.scan.parquet=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SCAN_PARQUET.key()}=false." } else if (!enableScanOrc) { - "Conversion disabled: spark.auron.enable.scan.orc=false." + s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SCAN_ORC.key()}=false." } else { s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}" } @@ -492,7 +492,7 @@ object AuronConverters extends Logging { !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier .getOrElse("unknown")}. " + - "Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " + + s"Set spark.${SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.key()}=true to enable timestamp support " + "or remove timestamp columns from the query.") } addRenameColumnsExec(Shims.get.createNativeParquetScanExec(exec)) @@ -503,7 +503,7 @@ object AuronConverters extends Logging { !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier .getOrElse("unknown")}. " + - "Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " + + s"Set spark.${SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.key()}=true to enable timestamp support " + "or remove timestamp columns from the query.") } addRenameColumnsExec(Shims.get.createNativeOrcScanExec(exec)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index b68b04954..03eda0ce9 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -74,7 +74,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu if (AuronConverters.enableShuffleExechange) { assert( AuronConverters.supportedShuffleManager, - "spark.auron.enable.shuffleExchange=true requires an Auron shuffle manager. " + + s"spark.${SparkAuronConfiguration.ENABLE_SHUFFLE_EXCHANGE.key()}=true requires an Auron shuffle manager. " + "Set spark.shuffle.manager to one of: " + "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager, " + "org.apache.spark.sql.execution.auron.shuffle.AuronUniffleShuffleManager, " + diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index a2ef9e3aa..479ad044c 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -32,7 +32,7 @@ class IcebergConvertProvider extends AuronConvertProvider with Logging { exec match { case _: BatchScanExec => val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get() - assert(enabled, "Conversion disabled: auron.enable.iceberg.scan=false.") + assert(enabled, s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.key()}=false.") assert( sparkCompatible, s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).") From bc905330c91ce8e9863b3784c5c27aff6742eb35 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Tue, 2 Jun 2026 00:41:09 +0800 Subject: [PATCH 2/4] [AURON #2295] Replace hardcoded config strings with typed key() references --- .../SparkAuronConfiguration.java | 8 +++ .../spark/sql/auron/AuronConverters.scala | 53 ++++++++++--------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java index 2f1fcc2b3..0567ea794 100644 --- a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java +++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java @@ -616,6 +616,14 @@ private T valueConverter(String value, Class valueClass) { throw new IllegalArgumentException("Unsupported default value type: " + valueClass.getName()); } } + + /** + * Returns the Spark-prefixed configuration key for the given option. + * SQL conf keys are stored without the "spark." prefix but require it at runtime. + */ + public static String sparkKey(ConfigOption option) { + return SPARK_PREFIX + option.key(); + } } class SparkContextOption extends ConfigOption { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 994b669c4..502998ee3 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -84,7 +84,7 @@ import org.apache.spark.sql.types.ShortType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructType -import org.apache.auron.configuration.AuronConfiguration +import org.apache.auron.configuration.{AuronConfiguration, ConfigOption} import org.apache.auron.jni.AuronAdaptor import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.EmptyPartitionsExecNode @@ -322,54 +322,57 @@ object AuronConverters extends Logging { } } + private def conversionDisabledMsg[T](option: ConfigOption[T]): String = + s"Conversion disabled: ${SparkAuronConfiguration.sparkKey(option)}=false." + private def addNeverConvertReasonTag(exec: SparkPlan) = { val neverConvertReason = exec match { case _: FileSourceScanExec if !enableScan => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SCAN.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN) case _: ProjectExec if !enableProject => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_PROJECT.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_PROJECT) case _: FilterExec if !enableFilter => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_FILTER.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_FILTER) case _: SortExec if !enableSort => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SORT.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SORT) case _: UnionExec if !enableUnion => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_UNION.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_UNION) case _: SortMergeJoinExec if !enableSmj => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SMJ.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SMJ) case _: ShuffledHashJoinExec if !enableShj => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SHJ.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SHJ) case _: BroadcastHashJoinExec if !enableBhj => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_BHJ.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_BHJ) case _: BroadcastNestedLoopJoinExec if !enableBnlj => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_BNLJ.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_BNLJ) case _: LocalLimitExec if !enableLocalLimit => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_LOCAL_LIMIT.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_LOCAL_LIMIT) case _: GlobalLimitExec if !enableGlobalLimit => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_GLOBAL_LIMIT.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_GLOBAL_LIMIT) case _: TakeOrderedAndProjectExec if !enableTakeOrderedAndProject => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT) case _: CollectLimitExec if !enableCollectLimit => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_COLLECT_LIMIT.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_COLLECT_LIMIT) case _: HashAggregateExec if !enableAggr => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_AGGR.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR) case _: ObjectHashAggregateExec if !enableAggr => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_AGGR.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR) case _: SortAggregateExec if !enableAggr => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_AGGR.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR) case _: ExpandExec if !enableExpand => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_EXPAND.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_EXPAND) case _: WindowExec if !enableWindow => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_WINDOW.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_WINDOW) case _: UnaryExecNode if exec.getClass.getSimpleName == "WindowGroupLimitExec" && !enableWindowGroupLimit => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT) case _: GenerateExec if !enableGenerate => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_GENERATE.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_GENERATE) case _: LocalTableScanExec if !enableLocalTableScan => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_LOCAL_TABLE_SCAN.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_LOCAL_TABLE_SCAN) case _: DataWritingCommandExec if !enableDataWriting => - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_DATA_WRITING.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_DATA_WRITING) case _ => s"${exec.getClass.getSimpleName} is not supported yet." } @@ -396,9 +399,9 @@ object AuronConverters extends Logging { exec match { case _: FileSourceScanExec if enableScan => if (!enableScanParquet) { - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SCAN_PARQUET.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN_PARQUET) } else if (!enableScanOrc) { - s"Conversion disabled: spark.${SparkAuronConfiguration.ENABLE_SCAN_ORC.key()}=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN_ORC) } else { s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}" } From 3550bae18520172bb66acaf0a07e0e113d2bbf5f Mon Sep 17 00:00:00 2001 From: guihuawen Date: Tue, 2 Jun 2026 19:16:24 +0800 Subject: [PATCH 3/4] [AURON #2295] Replace hardcoded config strings with typed key() references --- .../auron/spark/configuration/SparkAuronConfiguration.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java index 0567ea794..850716930 100644 --- a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java +++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java @@ -617,8 +617,7 @@ private T valueConverter(String value, Class valueClass) { } } - /** - * Returns the Spark-prefixed configuration key for the given option. + /** Returns the Spark-prefixed configuration key for the given option. * SQL conf keys are stored without the "spark." prefix but require it at runtime. */ public static String sparkKey(ConfigOption option) { From 566f214f9f6a232f1c37ff6079e3e472cc8d417b Mon Sep 17 00:00:00 2001 From: guihuawen Date: Tue, 2 Jun 2026 21:45:16 +0800 Subject: [PATCH 4/4] [AURON #2295] Replace hardcoded config strings with typed key() references --- .../auron/spark/configuration/SparkAuronConfiguration.java | 3 ++- .../spark/sql/auron/iceberg/IcebergConvertProvider.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java index 850716930..87d93189a 100644 --- a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java +++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java @@ -621,7 +621,8 @@ private T valueConverter(String value, Class valueClass) { * SQL conf keys are stored without the "spark." prefix but require it at runtime. */ public static String sparkKey(ConfigOption option) { - return SPARK_PREFIX + option.key(); + String key = option.key(); + return key.startsWith(SPARK_PREFIX) ? key : SPARK_PREFIX + key; } } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index 479ad044c..a2ef9e3aa 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -32,7 +32,7 @@ class IcebergConvertProvider extends AuronConvertProvider with Logging { exec match { case _: BatchScanExec => val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get() - assert(enabled, s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.key()}=false.") + assert(enabled, "Conversion disabled: auron.enable.iceberg.scan=false.") assert( sparkCompatible, s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).")