Skip to content

fix: Native_datafusion reports correct files and bytes scanned#3798

Merged
comphead merged 2 commits intoapache:mainfrom
0lai0:reports_scanned_twice
Apr 2, 2026
Merged

fix: Native_datafusion reports correct files and bytes scanned#3798
comphead merged 2 commits intoapache:mainfrom
0lai0:reports_scanned_twice

Conversation

@0lai0
Copy link
Copy Markdown
Contributor

@0lai0 0lai0 commented Mar 26, 2026

Which issue does this PR close?

Closes #3791

Rationale for this change

In CometScanExec, calling getFilePartitions() unconditionally executes sendDriverMetrics(). Because getFilePartitions() can be evaluated multiple times during planning (e.g., converting to CometNativeScanExec) and execution (e.g., fetching partitions), the SQLMetric accumulators like numFiles and filesSize were being duplicated. This led to incorrect double-counted values rendering in the Spark UI.

What changes are included in this PR?

  • Replaced metrics(...).add() with metrics(...).set() in CometScanExec to ensure idempotency when reporting metrics.
  • Wrapped the driver metric updates and Spark listener event dispatching inside a lazy val. This prevents both double-counting during Catalyst transformations (makeCopy) and sending redundant UI events.

How are these changes tested?

  • Added a dedicated end-to-end unit test in CometExecSuite.
  • The test writes a dummy Parquet dataset, sequentially triggers multiple UI actions (count and collect) to force severe plan evaluations, and strictly asserts that numFiles is exactly 2 without any duplication.

Copy link
Copy Markdown
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @0lai0.

@andygrove andygrove requested a review from comphead March 26, 2026 13:07
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically it's an artifact of wrapping CometNativeScan in CometScan, which we hopefully won't do in the future anyway.

Thanks for the fix in the meantime, @0lai0!

Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @0lai0 I'll quickly check it out today

Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow on UI I can now see 0

number of files read: 0
size of files read: 0.0 B

spark.range(100).repartition(2).write.mode("overwrite").parquet(path)

withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include --conf spark.comet.scan.impl=native_datafusion

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @comphead for review. I added this to the latest commit.

@0lai0
Copy link
Copy Markdown
Contributor Author

0lai0 commented Mar 27, 2026

Thank you all for the feedback. I’ll investigate this matter and fix it.

@mbutrovich
Copy link
Copy Markdown
Contributor

mbutrovich commented Mar 30, 2026

Marking this as draft so we don't accidentally merge it, feel free to flip it back when it's ready for another look. Thanks @0lai0!

@mbutrovich mbutrovich marked this pull request as draft March 30, 2026 16:00
@0lai0
Copy link
Copy Markdown
Contributor Author

0lai0 commented Mar 31, 2026

Thanks @mbutrovich . I'm still investigating the issue and trying to reproduce the scenario, but I haven't identified the root cause yet.

@0lai0
Copy link
Copy Markdown
Contributor Author

0lai0 commented Apr 2, 2026

Thank you all for the review.
I've updated the test to strictly use spark.comet.scan.impl=native_datafusion as requested.

After checking further, I've simplified the fix by changing metrics.add to metrics.set. This ensures idempotency: if Catalyst evaluates the metric multiple times, it updates to the same fixed value rather than accumulating (which would cause double-counting) or resetting to zero.

@0lai0 0lai0 marked this pull request as ready for review April 2, 2026 09:05
@0lai0
Copy link
Copy Markdown
Contributor Author

0lai0 commented Apr 2, 2026

This is the show on UI.

spark.range(10000000).repartition(20).write.parquet(location) 
spark.read.parquet("location").show(false)

But I'm not sure whether it is the correct snapshot.

Screenshot 2026-04-02 at 5 14 46 PM

@0lai0 0lai0 requested a review from comphead April 2, 2026 09:18
@comphead
Copy link
Copy Markdown
Contributor

comphead commented Apr 2, 2026

Checking, btw output_rows looks already also fixed in #3842

testing num Files

}

test("Native_datafusion reports correct files and bytes scanned") {
withTempDir { dir =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
withTempDir { dir =>
val inputFiles = 2
withTempDir { dir =>

test("Native_datafusion reports correct files and bytes scanned") {
withTempDir { dir =>
val path = new java.io.File(dir, "test_metrics").getAbsolutePath
spark.range(100).repartition(2).write.mode("overwrite").parquet(path)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
spark.range(100).repartition(2).write.mode("overwrite").parquet(path)
spark.range(100).repartition(inputFiles).write.mode("overwrite").parquet(path)


val numFiles = scanNode.metrics("numFiles").value
assert(
numFiles == 2,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
numFiles == 2,
numFiles == inputFiles,

val numFiles = scanNode.metrics("numFiles").value
assert(
numFiles == 2,
s"Expected exactly 2 files to be scanned, but got metrics reporting $numFiles")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s"Expected exactly 2 files to be scanned, but got metrics reporting $numFiles")
s"Expected exactly $inputFiles files to be scanned, but got metrics reporting $numFiles")

Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @0lai0 I can see correct numbers now, please polish a test a little bit and this PR is good to go

@comphead comphead merged commit 423b572 into apache:main Apr 2, 2026
158 checks passed
@comphead
Copy link
Copy Markdown
Contributor

comphead commented Apr 2, 2026

I went ahead with the merge to test it sooner

@0lai0
Copy link
Copy Markdown
Contributor Author

0lai0 commented Apr 3, 2026

Hi @comphead , apologies for the late reply! Thanks for taking care of the merge to test it sooner. I'll open a quick follow-up PR shortly to polish the test as you suggested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

native_datafusion reports twice more files and bytes scanned

4 participants