Skip to content

feat: implement spill callback for cross-task memory eviction [experimental]#3869

Draft
andygrove wants to merge 12 commits intoapache:mainfrom
andygrove:memory-profiling-tools
Draft

feat: implement spill callback for cross-task memory eviction [experimental]#3869
andygrove wants to merge 12 commits intoapache:mainfrom
andygrove:memory-profiling-tools

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Apr 1, 2026

Which issue does this PR close?

Related to reducing Comet's off-heap memory requirements for large-scale workloads.

Rationale for this change

When running TPC-H at SF100+ scale, Comet requires significantly more off-heap memory than Spark alone. Benchmarking on TPC-H SF100 with local[4] showed:

Config Q1 (agg) Q5 (5-way join) Q9 (6-way join)
Spark 4g offHeap 2700 MB 5167 MB 4580 MB
Comet 4g offHeap 679 MB 5534 MB 5911 MB
Comet 8g offHeap 665 MB 5440 MB 6359 MB

Q9 showed elastic memory growth (450 MB increase from 4g to 8g offHeap), traced to shuffle writer greedy buffering. The root cause is that Comet's NativeMemoryConsumer.spill() returned 0, preventing Spark from reclaiming memory across tasks.

What changes are included in this PR?

Spill callback implementation

  • SpillState (native/core/src/execution/memory_pools/spill.rs): Shared state with atomics and condvar for coordinating spill requests between Spark's memory manager thread and DataFusion's operator threads.
  • CometUnifiedMemoryPool: Checks SpillState.pressure() in try_grow() — returns ResourcesExhausted when spill pressure is active, triggering DataFusion's Sort/Aggregate/Shuffle operators to spill internally. Tracks freed bytes in shrink().
  • CometTaskMemoryManager.spill(): Now forwards spill requests to native via JNI requestSpill() instead of returning 0. Waits up to 10 seconds for operators to react.
  • CometExecIterator: Wires the native plan handle to CometTaskMemoryManager after creation, clears it before releasePlan to prevent callbacks after destruction.

Memory profiling tools

  • benchmarks/tpc/memory-profile.sh: Script that runs TPC-H queries individually under different configurations (Spark-only, Comet with varying offHeap sizes) in local mode, capturing peak RSS via /usr/bin/time -l.
  • docs/source/contributor-guide/memory-management.md: Analysis of Comet's memory management architecture, benchmark results, comparison with Gluten's approach, and documentation of the spill callback design.

How are these changes tested?

Benchmarked with TPC-H SF100 comparing peak RSS before and after the change. The spill callback is exercised whenever Spark's memory manager calls spill() on Comet's NativeMemoryConsumer, which happens when multiple concurrent tasks compete for the shared off-heap pool. Existing test suites cover the execution paths (Sort, Aggregate, Shuffle) that react to ResourcesExhausted by spilling.

Add benchmarking script to measure peak RSS per TPC-H query under
different Spark/Comet configurations and off-heap memory sizes.
Includes analysis document investigating why Comet requires more
off-heap memory than expected.
@andygrove andygrove changed the title feat: add memory profiling tools for off-heap analysis feat: add memory profiling tools for off-heap analysis [experimental] Apr 1, 2026
@andygrove andygrove changed the title feat: add memory profiling tools for off-heap analysis [experimental] feat: implement spill callback for cross-task memory eviction Apr 1, 2026
local[8] benchmark shows Comet memory usage now on par with Spark
(8525 vs 8476 MB) and elastic growth eliminated.
@andygrove andygrove changed the title feat: implement spill callback for cross-task memory eviction feat: implement spill callback for cross-task memory eviction [experimental] Apr 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant