diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java index 2f1fcc2b3..87d93189a 100644 --- a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java +++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java @@ -616,6 +616,14 @@ private T valueConverter(String value, Class valueClass) { throw new IllegalArgumentException("Unsupported default value type: " + valueClass.getName()); } } + + /** Returns the Spark-prefixed configuration key for the given option. + * SQL conf keys are stored without the "spark." prefix but require it at runtime. + */ + public static String sparkKey(ConfigOption option) { + String key = option.key(); + return key.startsWith(SPARK_PREFIX) ? key : SPARK_PREFIX + key; + } } class SparkContextOption extends ConfigOption { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index deb3c4f3f..502998ee3 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -84,7 +84,7 @@ import org.apache.spark.sql.types.ShortType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructType -import org.apache.auron.configuration.AuronConfiguration +import org.apache.auron.configuration.{AuronConfiguration, ConfigOption} import org.apache.auron.jni.AuronAdaptor import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.EmptyPartitionsExecNode @@ -322,54 +322,57 @@ object AuronConverters extends Logging { } } + private def conversionDisabledMsg[T](option: ConfigOption[T]): String = + s"Conversion disabled: ${SparkAuronConfiguration.sparkKey(option)}=false." + private def addNeverConvertReasonTag(exec: SparkPlan) = { val neverConvertReason = exec match { case _: FileSourceScanExec if !enableScan => - "Conversion disabled: spark.auron.enable.scan=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN) case _: ProjectExec if !enableProject => - "Conversion disabled: spark.auron.enable.project=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_PROJECT) case _: FilterExec if !enableFilter => - "Conversion disabled: spark.auron.enable.filter=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_FILTER) case _: SortExec if !enableSort => - "Conversion disabled: spark.auron.enable.sort=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SORT) case _: UnionExec if !enableUnion => - "Conversion disabled: spark.auron.enable.union=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_UNION) case _: SortMergeJoinExec if !enableSmj => - "Conversion disabled: spark.auron.enable.smj=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SMJ) case _: ShuffledHashJoinExec if !enableShj => - "Conversion disabled: spark.auron.enable.shj=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SHJ) case _: BroadcastHashJoinExec if !enableBhj => - "Conversion disabled: spark.auron.enable.bhj=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_BHJ) case _: BroadcastNestedLoopJoinExec if !enableBnlj => - "Conversion disabled: spark.auron.enable.bnlj=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_BNLJ) case _: LocalLimitExec if !enableLocalLimit => - "Conversion disabled: spark.auron.enable.local.limit=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_LOCAL_LIMIT) case _: GlobalLimitExec if !enableGlobalLimit => - "Conversion disabled: spark.auron.enable.global.limit=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_GLOBAL_LIMIT) case _: TakeOrderedAndProjectExec if !enableTakeOrderedAndProject => - "Conversion disabled: spark.auron.enable.take.ordered.and.project=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT) case _: CollectLimitExec if !enableCollectLimit => - "Conversion disabled: spark.auron.enable.collectLimit=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_COLLECT_LIMIT) case _: HashAggregateExec if !enableAggr => - "Conversion disabled: spark.auron.enable.aggr=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR) case _: ObjectHashAggregateExec if !enableAggr => - "Conversion disabled: spark.auron.enable.aggr=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR) case _: SortAggregateExec if !enableAggr => - "Conversion disabled: spark.auron.enable.aggr=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR) case _: ExpandExec if !enableExpand => - "Conversion disabled: spark.auron.enable.expand=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_EXPAND) case _: WindowExec if !enableWindow => - "Conversion disabled: spark.auron.enable.window=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_WINDOW) case _: UnaryExecNode if exec.getClass.getSimpleName == "WindowGroupLimitExec" && !enableWindowGroupLimit => - "Conversion disabled: spark.auron.enable.window.group.limit=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT) case _: GenerateExec if !enableGenerate => - "Conversion disabled: spark.auron.enable.generate=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_GENERATE) case _: LocalTableScanExec if !enableLocalTableScan => - "Conversion disabled: spark.auron.enable.local.table.scan=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_LOCAL_TABLE_SCAN) case _: DataWritingCommandExec if !enableDataWriting => - "Conversion disabled: spark.auron.enable.data.writing=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_DATA_WRITING) case _ => s"${exec.getClass.getSimpleName} is not supported yet." } @@ -396,9 +399,9 @@ object AuronConverters extends Logging { exec match { case _: FileSourceScanExec if enableScan => if (!enableScanParquet) { - "Conversion disabled: spark.auron.enable.scan.parquet=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN_PARQUET) } else if (!enableScanOrc) { - "Conversion disabled: spark.auron.enable.scan.orc=false." + conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN_ORC) } else { s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}" } @@ -492,7 +495,7 @@ object AuronConverters extends Logging { !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier .getOrElse("unknown")}. " + - "Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " + + s"Set spark.${SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.key()}=true to enable timestamp support " + "or remove timestamp columns from the query.") } addRenameColumnsExec(Shims.get.createNativeParquetScanExec(exec)) @@ -503,7 +506,7 @@ object AuronConverters extends Logging { !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier .getOrElse("unknown")}. " + - "Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " + + s"Set spark.${SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.key()}=true to enable timestamp support " + "or remove timestamp columns from the query.") } addRenameColumnsExec(Shims.get.createNativeOrcScanExec(exec)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index b68b04954..03eda0ce9 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -74,7 +74,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu if (AuronConverters.enableShuffleExechange) { assert( AuronConverters.supportedShuffleManager, - "spark.auron.enable.shuffleExchange=true requires an Auron shuffle manager. " + + s"spark.${SparkAuronConfiguration.ENABLE_SHUFFLE_EXCHANGE.key()}=true requires an Auron shuffle manager. " + "Set spark.shuffle.manager to one of: " + "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager, " + "org.apache.spark.sql.execution.auron.shuffle.AuronUniffleShuffleManager, " +