Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,14 @@ private <T> T valueConverter(String value, Class<T> valueClass) {
throw new IllegalArgumentException("Unsupported default value type: " + valueClass.getName());
}
}

/** Returns the Spark-prefixed configuration key for the given option.
* SQL conf keys are stored without the "spark." prefix but require it at runtime.
*/
public static String sparkKey(ConfigOption<?> option) {
String key = option.key();
return key.startsWith(SPARK_PREFIX) ? key : SPARK_PREFIX + key;
}
Comment thread
guixiaowen marked this conversation as resolved.
}

class SparkContextOption<T> extends ConfigOption<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ import org.apache.spark.sql.types.ShortType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType

import org.apache.auron.configuration.AuronConfiguration
import org.apache.auron.configuration.{AuronConfiguration, ConfigOption}
import org.apache.auron.jni.AuronAdaptor
import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.EmptyPartitionsExecNode
Expand Down Expand Up @@ -322,54 +322,57 @@ object AuronConverters extends Logging {
}
}

private def conversionDisabledMsg[T](option: ConfigOption[T]): String =
s"Conversion disabled: ${SparkAuronConfiguration.sparkKey(option)}=false."

private def addNeverConvertReasonTag(exec: SparkPlan) = {
val neverConvertReason =
exec match {
case _: FileSourceScanExec if !enableScan =>
"Conversion disabled: spark.auron.enable.scan=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN)
case _: ProjectExec if !enableProject =>
"Conversion disabled: spark.auron.enable.project=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_PROJECT)
case _: FilterExec if !enableFilter =>
"Conversion disabled: spark.auron.enable.filter=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_FILTER)
case _: SortExec if !enableSort =>
"Conversion disabled: spark.auron.enable.sort=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SORT)
case _: UnionExec if !enableUnion =>
"Conversion disabled: spark.auron.enable.union=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_UNION)
case _: SortMergeJoinExec if !enableSmj =>
"Conversion disabled: spark.auron.enable.smj=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SMJ)
case _: ShuffledHashJoinExec if !enableShj =>
"Conversion disabled: spark.auron.enable.shj=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SHJ)
case _: BroadcastHashJoinExec if !enableBhj =>
"Conversion disabled: spark.auron.enable.bhj=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_BHJ)
case _: BroadcastNestedLoopJoinExec if !enableBnlj =>
"Conversion disabled: spark.auron.enable.bnlj=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_BNLJ)
case _: LocalLimitExec if !enableLocalLimit =>
"Conversion disabled: spark.auron.enable.local.limit=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_LOCAL_LIMIT)
case _: GlobalLimitExec if !enableGlobalLimit =>
"Conversion disabled: spark.auron.enable.global.limit=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_GLOBAL_LIMIT)
case _: TakeOrderedAndProjectExec if !enableTakeOrderedAndProject =>
"Conversion disabled: spark.auron.enable.take.ordered.and.project=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT)
case _: CollectLimitExec if !enableCollectLimit =>
"Conversion disabled: spark.auron.enable.collectLimit=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_COLLECT_LIMIT)
case _: HashAggregateExec if !enableAggr =>
"Conversion disabled: spark.auron.enable.aggr=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR)
case _: ObjectHashAggregateExec if !enableAggr =>
"Conversion disabled: spark.auron.enable.aggr=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR)
case _: SortAggregateExec if !enableAggr =>
"Conversion disabled: spark.auron.enable.aggr=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_AGGR)
case _: ExpandExec if !enableExpand =>
"Conversion disabled: spark.auron.enable.expand=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_EXPAND)
case _: WindowExec if !enableWindow =>
"Conversion disabled: spark.auron.enable.window=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_WINDOW)
case _: UnaryExecNode
if exec.getClass.getSimpleName == "WindowGroupLimitExec" && !enableWindowGroupLimit =>
"Conversion disabled: spark.auron.enable.window.group.limit=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT)
case _: GenerateExec if !enableGenerate =>
"Conversion disabled: spark.auron.enable.generate=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_GENERATE)
case _: LocalTableScanExec if !enableLocalTableScan =>
"Conversion disabled: spark.auron.enable.local.table.scan=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_LOCAL_TABLE_SCAN)
case _: DataWritingCommandExec if !enableDataWriting =>
"Conversion disabled: spark.auron.enable.data.writing=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_DATA_WRITING)
case _ =>
s"${exec.getClass.getSimpleName} is not supported yet."
}
Expand All @@ -396,9 +399,9 @@ object AuronConverters extends Logging {
exec match {
case _: FileSourceScanExec if enableScan =>
if (!enableScanParquet) {
"Conversion disabled: spark.auron.enable.scan.parquet=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN_PARQUET)
} else if (!enableScanOrc) {
"Conversion disabled: spark.auron.enable.scan.orc=false."
conversionDisabledMsg(SparkAuronConfiguration.ENABLE_SCAN_ORC)
} else {
s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}"
}
Expand Down Expand Up @@ -492,7 +495,7 @@ object AuronConverters extends Logging {
!exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier
.getOrElse("unknown")}. " +
"Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " +
s"Set spark.${SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.key()}=true to enable timestamp support " +
"or remove timestamp columns from the query.")
}
addRenameColumnsExec(Shims.get.createNativeParquetScanExec(exec))
Expand All @@ -503,7 +506,7 @@ object AuronConverters extends Logging {
!exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier
.getOrElse("unknown")}. " +
"Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " +
s"Set spark.${SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.key()}=true to enable timestamp support " +
"or remove timestamp columns from the query.")
}
addRenameColumnsExec(Shims.get.createNativeOrcScanExec(exec))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu
if (AuronConverters.enableShuffleExechange) {
assert(
AuronConverters.supportedShuffleManager,
"spark.auron.enable.shuffleExchange=true requires an Auron shuffle manager. " +
s"spark.${SparkAuronConfiguration.ENABLE_SHUFFLE_EXCHANGE.key()}=true requires an Auron shuffle manager. " +
"Set spark.shuffle.manager to one of: " +
"org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager, " +
"org.apache.spark.sql.execution.auron.shuffle.AuronUniffleShuffleManager, " +
Expand Down
Loading