Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ object AuronConvertStrategy extends Logging {
case _ =>
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
exec.setTagValue(
neverConvertReasonTag,
s"${exec.getClass.getSimpleName} is not supported yet.")
if (!exec.getTagValue(neverConvertReasonTag).isDefined) {
exec.setTagValue(
neverConvertReasonTag,
s"${exec.getClass.getSimpleName} is not supported yet.")
}
}
danglingChildren = newDangling :+ converted
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,22 @@ object AuronConverters extends Logging {
case e: BroadcastExchangeExec if enableBroadcastExchange =>
tryConvert(e, convertBroadcastExchangeExec)
case e: FileSourceScanExec if enableScan => // scan
extConvertProviders.find(p => p.isEnabled(e) && p.isSupported(e)) match {
case Some(provider) => tryConvert(e, provider.convert)
case None => tryConvert(e, convertFileSourceScanExec)
try {
extConvertProviders.find(p => p.isEnabled(e) && p.isSupported(e)) match {
case Some(provider) => tryConvert(e, provider.convert)
case None => tryConvert(e, convertFileSourceScanExec)
}
} catch {
case err @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
val msg =
Option(err.getMessage)
.getOrElse(err.toString)
.replaceFirst("^assertion failed: ?", "")
exec.setTagValue(convertToNonNativeTag, true)
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
exec.setTagValue(neverConvertReasonTag, msg)
exec
}
case e: ProjectExec if enableProject => // project
tryConvert(e, convertProjectExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class HudiConvertProvider extends AuronConvertProvider with Logging {
major <- maybeMajor
minor <- maybeMinor
} yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false)
assert(
SparkAuronConfiguration.ENABLE_HUDI_SCAN.get(),
s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_HUDI_SCAN.key()} is false.")
assert(supported, "Conversion disabled: Only supported Spark versions: 3.0 to 3.5.")
SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The asserts in isEnabled change the contract of this method: it's supposed to return false when not enabled (so find can skip it), but now it throws instead. The exception is caught upstream in AuronConverters, so it works — but it's fragile: if both isEnabled and isSupported throw, only the first reason is ever recorded. Also, the final SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported is now dead code — it can never be reached.

Consider keeping isEnabled/isSupported as pure predicates (return false) and recording the reason only in convert, where the context is clearer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The asserts in isEnabled change the contract of this method: it's supposed to return false when not enabled (so find can skip it), but now it throws instead. The exception is caught upstream in AuronConverters, so it works — but it's fragile: if both isEnabled and isSupported throw, only the first reason is ever recorded. Also, the final SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported is now dead code — it can never be reached.

Consider keeping isEnabled/isSupported as pure predicates (return false) and recording the reason only in convert, where the context is clearer.

@yew1eb ye Thank you very much for the code review.

Regarding isEnabled and isSupported – they are called within AuronConverters. If isEnabled returns false, isSupported should not be called at all, so I believe both methods will not throw exceptions at the same time. The reasons for not being converted are different between isEnabled and isSupported, and those reasons are only known within these methods, which is why I added the assertions there.

May I ask if your suggestion is to keep isEnabled and isSupported as pure predicates that simply return true or false – without handling too many scenarios inside them – and move the logic for checking Spark version and Hudi format type into the convert method instead?

Is my understanding correct? If so, I will make the changes accordingly. I would really appreciate your confirmation.

Thank you again for your guidance!

case _ => false
}
Expand All @@ -49,7 +53,9 @@ class HudiConvertProvider extends AuronConvertProvider with Logging {
exec match {
case scan: FileSourceScanExec =>
// Only handle Hudi-backed file scans; other scans fall through.
HudiScanSupport.supportedFileFormat(scan).nonEmpty
val isSupportedFileFormat = HudiScanSupport.supportedFileFormat(scan).nonEmpty
assert(isSupportedFileFormat, "Conversion disabled: FileFormat is not supported.")
isSupportedFileFormat
case _ => false
}
}
Expand All @@ -59,29 +65,33 @@ class HudiConvertProvider extends AuronConvertProvider with Logging {
case scan: FileSourceScanExec =>
HudiScanSupport.supportedFileFormat(scan) match {
case Some(HudiScanSupport.ParquetFormat) =>
if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()) {
return exec
}
assert(
SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get(),
s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_SCAN_PARQUET.key()} is false.")
// Hudi falls back to Spark when timestamp scanning is disabled.
if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get()) {
if (scan.requiredSchema.exists(e =>
NativeConverters.existTimestampType(e.dataType))) {
return exec
assert(false, "Conversion disabled: Has timestamp type.")
}
}
logDebug(s"Applying native parquet scan for Hudi: ${scan.relation.location}")
AuronConverters.addRenameColumnsExec(Shims.get.createNativeParquetScanExec(scan))
case Some(HudiScanSupport.OrcFormat) =>
if (!SparkAuronConfiguration.ENABLE_SCAN_ORC.get()) {
return exec
}
assert(
SparkAuronConfiguration.ENABLE_SCAN_ORC.get(),
s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_SCAN_ORC.key()} is false.")
// ORC follows the same timestamp fallback rule as Parquet.
assert(
SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get(),
s"Conversion disabled: ${SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.key()} is false.")
if (!SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get()) {
Comment on lines 80 to 88
if (scan.requiredSchema.exists(e =>
NativeConverters.existTimestampType(e.dataType))) {
return exec
assert(false, "Conversion disabled: Has timestamp type.")
}
}

logDebug(s"Applying native ORC scan for Hudi: ${scan.relation.location}")
AuronConverters.addRenameColumnsExec(Shims.get.createNativeOrcScanExec(scan))
case None => exec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.SparkConf
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.ExplainUtils.collectFirst
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.auron.plan.NativeOrcScanBase
import org.apache.spark.sql.execution.auron.plan.NativeParquetScanBase
Expand Down Expand Up @@ -530,4 +532,21 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
assertFilterReferences(filteredScan.dataFilters, "id", "data")
}
}

test("hudi: when falls back, check never convert reason") {
withTable("hudi_ts") {
spark.sql("create table hudi_ts (id int, ts timestamp) using hudi")
spark.sql("insert into hudi_ts values (1, timestamp('2026-01-01 00:00:00'))")
val df = spark.sql("select * from hudi_ts")
// Timestamp columns are not supported when native timestamp scanning is disabled.
logFileFormats(df)
df.collect()
val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason")
assert(collectFirst(df.queryExecution.executedPlan) {
case fileSourceScanExec: FileSourceScanExec =>
fileSourceScanExec.getTagValue(neverConvertReasonTag)
}.get.get.equals(
"Falling back exec: FileSourceScanExec: assertion failed: Conversion disabled: Has timestamp type."))
}
}
}
Loading