Skip to content

chore: native_datafusion to report scan task input metrics#3842

Open
comphead wants to merge 12 commits intoapache:mainfrom
comphead:schema_adapter
Open

chore: native_datafusion to report scan task input metrics#3842
comphead wants to merge 12 commits intoapache:mainfrom
comphead:schema_adapter

Conversation

@comphead
Copy link
Copy Markdown
Contributor

@comphead comphead commented Mar 30, 2026

Which issue does this PR close?

Closes #3735

Prerequisites for #3817

Rationale for this change

Problem

When using Comet's native_datafusion scan (CometNativeScanExec), Spark's task-level input metrics (bytesRead, recordsRead) are always zero. These metrics feed the "Input" column in the Spark UI Stages tab and are aggregated by AppStatusListener for job-level reporting.

Standard Spark reports input metrics in FileScanRDD.compute() by reading Hadoop FileSystem thread-local statistics via SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(). Since the native DataFusion scan reads Parquet files entirely in Rust, it never touches Hadoop's Java I/O layer, so those
thread-local counters are never incremented.

What Comet already tracks

The native side already tracks the relevant data:

  • bytes_scanned -- counted in parquet_read_cached_factory.rs via a DataFusion counter metric, incremented on every get_bytes() and get_byte_ranges() call.
  • output_rows -- tracked by DataFusion's ParquetExec.

These flow back to the JVM via CometMetricNode.set_all_from_bytes() and appear as SQL-level metrics in the Spark UI operator details. However, they were never propagated to the task-level TaskMetrics.inputMetrics.

Solution

In the existing TaskCompletionListener inside CometExecRDD.compute(), after closing the iterator, read the final values of bytes_scanned and output_rows from the CometMetricNode tree and set them on TaskContext.taskMetrics().inputMetrics. This adds zero per-batch overhead -- metrics are written once at
task completion.

A findMetric helper on CometMetricNode performs a depth-first search through the metric tree, so it works whether the scan is standalone (CometNativeScanExec creates the RDD directly) or wrapped inside a larger native plan (CometNativeExec with Filter/Project above the scan).

Changes

  • CometMetricNode.scala -- Added findMetric(name) for depth-first metric lookup in the node tree.
  • CometExecRDD.scala -- In the task completion listener, propagate bytes_scanned and output_rows to inputMetrics.setBytesRead / setRecordsRead.
  • CometTaskMetricsSuite.scala -- Added test that compares input metrics from native_datafusion scan against vanilla Spark (Comet disabled). Records must match exactly;

What changes are included in this PR?

How are these changes tested?

@comphead
Copy link
Copy Markdown
Contributor Author

image

Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this solution is robust enough.

*/
def findMetric(name: String): Option[SQLMetric] = {
metrics.get(name).orElse {
children.iterator.map(_.findMetric(name)).collectFirst { case Some(m) => m }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this just return the first match it finds with the metric name? Can't multiple plans have nodes that have "output_rows"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mm, what if we try to restrict output_rows to scan nodes?

val outputRowsMetric = nativeMetrics.findMetric("output_rows")
if (bytesScannedMetric.isDefined || outputRowsMetric.isDefined) {
val inputMetrics = ctx.taskMetrics().inputMetrics
bytesScannedMetric.foreach(m => inputMetrics.setBytesRead(m.value))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreach already handles the None case for finding the metric, so I find wrapping this in if unnecessary. You save ctx.taskMetrics().inputMetrics but the result is oddly-structured conditional logic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

subqueries.foreach(sub => CometScalarSubquery.removeSubquery(it.id, sub))

nativeMetrics.metrics
.get("bytes_scanned")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before ac6b869 the logic was looking into the children (and collected just the first one).
Now it looks only in the root node.
Shouldn't it look into all Scan nodes ? Or maybe the logic should be moved to CometNativeScanExec#doExecuteColumnar() ?!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part where @mbutrovich was concerned as output_rows is too wide used name for other plan nodes and can be confused, so currently I'm using the closest node to find metrics

spark.sparkContext.listenerBus.waitUntilEmpty()

withSQLConf(confs: _*) {
sql("SELECT * FROM tbl").collect()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sql("SELECT * FROM tbl").collect()
sql("SELECT * FROM tbl WHERE _1 > 5000").collect()

add a filter to make it more realistic

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @martin-g why the filter would be needed? I'd prefer to keep repro as simple as possible

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A filter would show the discrepancy/incorrect values when scan isn't the first child node.

Comment on lines +104 to +105
val (cometBytes, cometRecords) = collectInputMetrics(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val (cometBytes, cometRecords) = collectInputMetrics(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION)
val (cometBytes, cometRecords) = collectInputMetrics(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CometConf.COMET_ENABLED.key -> "true", is enabled on test level by default, but I think we might ensure Comet operators was applied

Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not sure this works, and a more elaborate test would confirm. The metric propagation is in CometExecRDD.compute(), which runs for all Comet operators, not just native scan. bytes_scanned is safe because it only exists in nativeScanMetrics. But output_rows exists in baselineMetrics too, so for any CometExec plan (e.g., Filter -> Scan), recordsRead gets set to the post-filter count rather than actual records read from storage.

A test with a WHERE clause would expose this since output_rows and actual scan count would diverge.

@martin-g
Copy link
Copy Markdown
Member

Right!
This is why I think a possible solution is to move the logic to CometNativeScanExec#doExecuteColumnar(). There a CometExecRDD is instantiated and returned:

You can make it an anonymous class instance like:

CometExecRDD(...) {
   override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = 
   {
      val res = super.compute(split, context);
    
      // new logic here

      res
   }
}

@comphead
Copy link
Copy Markdown
Contributor Author

comphead commented Mar 31, 2026

Right! This is why I think a possible solution is to move the logic to CometNativeScanExec#doExecuteColumnar().

this actually a neat way to isolate scan metrics collection to scan only

@comphead
Copy link
Copy Markdown
Contributor Author

comphead commented Apr 1, 2026

The code now returns correctly input metrics(rows/bytes) for scan native node(checked this visually), however the test cannot capture such scenario correctly, working if the test can be improved

@comphead
Copy link
Copy Markdown
Contributor Author

comphead commented Apr 1, 2026

Visual test is

NonFiltered

image

Filtered

image

@comphead comphead requested review from martin-g and mbutrovich April 2, 2026 00:20
@comphead
Copy link
Copy Markdown
Contributor Author

comphead commented Apr 2, 2026

@mbutrovich @martin-g PTAL

the output_rows for filtered queries shows same as Spark values.

encryptedFilePaths: Seq[String] = Seq.empty,
shuffleScanIndices: Set[Int] = Set.empty)
shuffleScanIndices: Set[Int] = Set.empty,
hasNativeScan: Boolean = false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it being passed in, but is hasNativeScan ever used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, it is leftover after experimenting

}
}

// Called via JNI from `comet_metric_node.rs`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that the only place this will ever be called from? Otherwise I'm not sure the comment is necessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE highlights the method as unused because it is called via JNI only, can be accidentally cleaned up. Added comments to clarify

@mbutrovich mbutrovich self-requested a review April 2, 2026 16:39
withTempPath { dir =>
val rng = new scala.util.Random(42)
spark
.createDataFrame((0 until totalRows).map(_ => (rng.nextInt(), rng.nextLong())))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a smaller range for the random values, or just a shuffle from 0 to totalRows? That way we'd know exactly the amount of data we should get back. Right now it's likely selecting every row.

Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not convinced of the generality of this approach. CometNativeScanExec is an input source in foreachUntilCometInput (line 598 of operators.scala), so CometNativeExec.doExecuteColumnar() always calls executeColumnar() on it (line 516) and the scan always gets its own CometExecRDD. This means @martin-g's anonymous subclass approach works and might be the cleaner solution since it scopes the metric propagation to scan RDDs without needing the leafNode traversal. leafNode is fragile for branching metric trees (e.g., unions) since it always follows children.head.

@comphead
Copy link
Copy Markdown
Contributor Author

comphead commented Apr 3, 2026

i tried options with overriding compute and super.compute() returns a lazy iterator (CometExecIterator). At that point, no data has been read yet — bytes_scanned is still 0. The metrics only get populated as the iterator is consumed by downstream operators, which happens after compute() override returns.

So I had to apply the logic to CometNativeExec on query task listener with the guard to apply metrics only for tasks started with native scans.

CometNativeScanExec.doExecuteColumnar by some reason is not called for native scans, which might require another investigation ticket(@mbutrovich WDYT?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

native_datafusion doesn't report input metrics

3 participants