diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala index 77e4c6632..546c6561d 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala @@ -68,9 +68,11 @@ object AuronConvertStrategy extends Logging { case _ => exec.setTagValue(convertibleTag, false) exec.setTagValue(convertStrategyTag, NeverConvert) - exec.setTagValue( - neverConvertReasonTag, - s"${exec.getClass.getSimpleName} is not supported yet.") + if (!exec.getTagValue(neverConvertReasonTag).isDefined) { + exec.setTagValue( + neverConvertReasonTag, + s"${exec.getClass.getSimpleName} is not supported yet.") + } } danglingChildren = newDangling :+ converted } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index deb3c4f3f..b07237988 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -212,9 +212,22 @@ object AuronConverters extends Logging { case e: BroadcastExchangeExec if enableBroadcastExchange => tryConvert(e, convertBroadcastExchangeExec) case e: FileSourceScanExec if enableScan => // scan - extConvertProviders.find(p => p.isEnabled(e) && p.isSupported(e)) match { - case Some(provider) => tryConvert(e, provider.convert) - case None => tryConvert(e, convertFileSourceScanExec) + try { + extConvertProviders.find(p => p.isEnabled(e) && p.isSupported(e)) match { + case Some(provider) => tryConvert(e, provider.convert) + case None => tryConvert(e, convertFileSourceScanExec) + } + } catch { + case err @ (_: NotImplementedError | _: AssertionError | _: Exception) => + val msg = + Option(err.getMessage) + .getOrElse(err.toString) + .replaceFirst("^assertion failed: ?", "") + exec.setTagValue(convertToNonNativeTag, true) + exec.setTagValue(convertibleTag, false) + exec.setTagValue(convertStrategyTag, NeverConvert) + exec.setTagValue(neverConvertReasonTag, msg) + exec } case e: ProjectExec if enableProject => // project tryConvert(e, convertProjectExec) diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala index ceef69c24..ed0b6c055 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala @@ -40,6 +40,10 @@ class HudiConvertProvider extends AuronConvertProvider with Logging { major <- maybeMajor minor <- maybeMinor } yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false) + assert( + SparkAuronConfiguration.ENABLE_HUDI_SCAN.get(), + s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_HUDI_SCAN.key()} is false.") + assert(supported, "Conversion disabled: Only supported Spark versions: 3.0 to 3.5.") SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported case _ => false } @@ -49,7 +53,9 @@ class HudiConvertProvider extends AuronConvertProvider with Logging { exec match { case scan: FileSourceScanExec => // Only handle Hudi-backed file scans; other scans fall through. - HudiScanSupport.supportedFileFormat(scan).nonEmpty + val isSupportedFileFormat = HudiScanSupport.supportedFileFormat(scan).nonEmpty + assert(isSupportedFileFormat, "Conversion disabled: FileFormat is not supported.") + isSupportedFileFormat case _ => false } } @@ -59,29 +65,33 @@ class HudiConvertProvider extends AuronConvertProvider with Logging { case scan: FileSourceScanExec => HudiScanSupport.supportedFileFormat(scan) match { case Some(HudiScanSupport.ParquetFormat) => - if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()) { - return exec - } + assert( + SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get(), + s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_SCAN_PARQUET.key()} is false.") // Hudi falls back to Spark when timestamp scanning is disabled. if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get()) { if (scan.requiredSchema.exists(e => NativeConverters.existTimestampType(e.dataType))) { - return exec + assert(false, "Conversion disabled: Has timestamp type.") } } logDebug(s"Applying native parquet scan for Hudi: ${scan.relation.location}") AuronConverters.addRenameColumnsExec(Shims.get.createNativeParquetScanExec(scan)) case Some(HudiScanSupport.OrcFormat) => - if (!SparkAuronConfiguration.ENABLE_SCAN_ORC.get()) { - return exec - } + assert( + SparkAuronConfiguration.ENABLE_SCAN_ORC.get(), + s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_SCAN_ORC.key()} is false.") // ORC follows the same timestamp fallback rule as Parquet. + assert( + SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get(), + s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.key()} is false.") if (!SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get()) { if (scan.requiredSchema.exists(e => NativeConverters.existTimestampType(e.dataType))) { - return exec + assert(false, "Conversion disabled: Has timestamp type.") } } + logDebug(s"Applying native ORC scan for Hudi: ${scan.relation.location}") AuronConverters.addRenameColumnsExec(Shims.get.createNativeOrcScanExec(scan)) case None => exec diff --git a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala index dbb1f0c46..786c4ea63 100644 --- a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala +++ b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala @@ -25,6 +25,8 @@ import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.ExplainUtils.collectFirst import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.auron.plan.NativeOrcScanBase import org.apache.spark.sql.execution.auron.plan.NativeParquetScanBase @@ -530,4 +532,21 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession { assertFilterReferences(filteredScan.dataFilters, "id", "data") } } + + test("hudi: when falls back, check never convert reason") { + withTable("hudi_ts") { + spark.sql("create table hudi_ts (id int, ts timestamp) using hudi") + spark.sql("insert into hudi_ts values (1, timestamp('2026-01-01 00:00:00'))") + val df = spark.sql("select * from hudi_ts") + // Timestamp columns are not supported when native timestamp scanning is disabled. + logFileFormats(df) + df.collect() + val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason") + assert(collectFirst(df.queryExecution.executedPlan) { + case fileSourceScanExec: FileSourceScanExec => + fileSourceScanExec.getTagValue(neverConvertReasonTag) + }.get.get.equals( + "Falling back exec: FileSourceScanExec: assertion failed: Conversion disabled: Has timestamp type.")) + } + } }