From 61db994c2e858bec33a80e44662e698317b3c4da Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 06:06:29 -0600 Subject: [PATCH 01/12] feat: add memory profiling script and analysis for off-heap usage Add benchmarking script to measure peak RSS per TPC-H query under different Spark/Comet configurations and off-heap memory sizes. Includes analysis document investigating why Comet requires more off-heap memory than expected. --- benchmarks/tpc/memory-profile.sh | 186 ++++++++++++++++++++++ docs/memory-analysis.md | 258 +++++++++++++++++++++++++++++++ 2 files changed, 444 insertions(+) create mode 100755 benchmarks/tpc/memory-profile.sh create mode 100644 docs/memory-analysis.md diff --git a/benchmarks/tpc/memory-profile.sh b/benchmarks/tpc/memory-profile.sh new file mode 100755 index 0000000000..461fa65a9a --- /dev/null +++ b/benchmarks/tpc/memory-profile.sh @@ -0,0 +1,186 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Memory profiling script for TPC-H queries. +# Runs each query under different configurations and records peak RSS. +# +# Usage: +# ./memory-profile.sh [--queries "1 5 9"] [--offheap-sizes "4g 8g 16g"] +# [--cores 4] [--data /path/to/tpch] +# +# Requires: SPARK_HOME, COMET_JAR (or builds from source) + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" + +# Defaults +QUERIES="${QUERIES:-$(seq 1 22)}" +OFFHEAP_SIZES="${OFFHEAP_SIZES:-4g 8g 16g}" +LOCAL_CORES="${LOCAL_CORES:-4}" +TPCH_DATA="${TPCH_DATA:-/opt/tpch/sf100}" +SPARK_HOME="${SPARK_HOME:-/opt/spark-3.5.8-bin-hadoop3}" +DRIVER_MEMORY="${DRIVER_MEMORY:-8g}" +OUTPUT_DIR="${OUTPUT_DIR:-$SCRIPT_DIR/memory-profile-results}" + +# Find Comet JAR +if [ -z "${COMET_JAR:-}" ]; then + COMET_JAR=$(ls "$REPO_ROOT"/spark/target/comet-spark-spark3.5_2.12-*-SNAPSHOT.jar 2>/dev/null \ + | grep -v sources | grep -v test | head -1) + if [ -z "$COMET_JAR" ]; then + echo "Error: No Comet JAR found. Set COMET_JAR or run 'make'." + exit 1 + fi +fi + +# Parse args +while [[ $# -gt 0 ]]; do + case $1 in + --queries) QUERIES="$2"; shift 2 ;; + --offheap-sizes) OFFHEAP_SIZES="$2"; shift 2 ;; + --cores) LOCAL_CORES="$2"; shift 2 ;; + --data) TPCH_DATA="$2"; shift 2 ;; + --output) OUTPUT_DIR="$2"; shift 2 ;; + --driver-memory) DRIVER_MEMORY="$2"; shift 2 ;; + *) echo "Unknown option: $1"; exit 1 ;; + esac +done + +mkdir -p "$OUTPUT_DIR" +RESULTS_FILE="$OUTPUT_DIR/results.csv" +LOG_DIR="$OUTPUT_DIR/logs" +mkdir -p "$LOG_DIR" + +# Ensure spark-events dir exists +mkdir -p /tmp/spark-events + +echo "Configuration:" +echo " SPARK_HOME: $SPARK_HOME" +echo " COMET_JAR: $COMET_JAR" +echo " TPCH_DATA: $TPCH_DATA" +echo " LOCAL_CORES: $LOCAL_CORES" +echo " DRIVER_MEMORY: $DRIVER_MEMORY" +echo " OFFHEAP_SIZES: $OFFHEAP_SIZES" +echo " QUERIES: $QUERIES" +echo " OUTPUT_DIR: $OUTPUT_DIR" +echo "" + +# CSV header +echo "engine,offheap_size,query,peak_rss_mb,wall_time_sec,exit_code" > "$RESULTS_FILE" + +# Run a single query and capture peak RSS +run_query() { + local engine="$1" + local offheap="$2" + local query_num="$3" + local label="${engine}-offheap${offheap}-q${query_num}" + local log_file="$LOG_DIR/${label}.log" + local time_file="$LOG_DIR/${label}.time" + + # Common Spark conf + local conf=( + --master "local[${LOCAL_CORES}]" + --driver-memory "$DRIVER_MEMORY" + --conf "spark.memory.offHeap.enabled=true" + --conf "spark.memory.offHeap.size=${offheap}" + --conf "spark.eventLog.enabled=false" + --conf "spark.ui.enabled=false" + ) + + if [ "$engine" = "comet" ]; then + conf+=( + --jars "$COMET_JAR" + --driver-class-path "$COMET_JAR" + --conf "spark.driver.extraClassPath=$COMET_JAR" + --conf "spark.plugins=org.apache.spark.CometPlugin" + --conf "spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" + --conf "spark.comet.scan.impl=native_datafusion" + --conf "spark.comet.expression.Cast.allowIncompatible=true" + ) + fi + + conf+=( + "$SCRIPT_DIR/tpcbench.py" + --name "$label" + --benchmark tpch + --data "$TPCH_DATA" + --format parquet + --output "$OUTPUT_DIR" + --iterations 1 + --query "$query_num" + ) + + echo -n " $label ... " + + # Run with /usr/bin/time -l, capture stderr (where time writes) separately + local exit_code=0 + /usr/bin/time -l "$SPARK_HOME/bin/spark-submit" "${conf[@]}" \ + > "$log_file" 2> "$time_file" || exit_code=$? + + # Parse peak RSS from /usr/bin/time -l output (macOS format) + local peak_rss_bytes + peak_rss_bytes=$(grep "maximum resident set size" "$time_file" | awk '{print $1}') || true + local peak_rss_mb="N/A" + if [ -n "$peak_rss_bytes" ]; then + peak_rss_mb=$((peak_rss_bytes / 1048576)) + fi + + # Parse wall clock time + local wall_time + wall_time=$(grep "elapsed" "$time_file" | head -1 | awk '{print $1}') || true + # Convert MM:SS.xx or H:MM:SS.xx to seconds + local wall_secs="N/A" + if [ -n "$wall_time" ]; then + wall_secs=$(echo "$wall_time" | awk -F: '{ + if (NF==3) print $1*3600 + $2*60 + $3; + else if (NF==2) print $1*60 + $2; + else print $1 + }') + fi + + if [ "$exit_code" -eq 0 ]; then + echo "RSS=${peak_rss_mb}MB, time=${wall_secs}s" + else + echo "FAILED (exit=$exit_code), RSS=${peak_rss_mb}MB" + fi + + echo "${engine},${offheap},${query_num},${peak_rss_mb},${wall_secs},${exit_code}" >> "$RESULTS_FILE" +} + +# Main loop +echo "=== Running Spark baseline (no Comet) ===" +for q in $QUERIES; do + run_query "spark" "4g" "$q" +done + +echo "" +echo "=== Running Comet with varying offHeap sizes ===" +for offheap in $OFFHEAP_SIZES; do + echo "--- offHeap = $offheap ---" + for q in $QUERIES; do + run_query "comet" "$offheap" "$q" + done + echo "" +done + +echo "=== Results ===" +column -t -s, "$RESULTS_FILE" +echo "" +echo "Full results: $RESULTS_FILE" +echo "Logs: $LOG_DIR/" diff --git a/docs/memory-analysis.md b/docs/memory-analysis.md new file mode 100644 index 0000000000..2247d95e63 --- /dev/null +++ b/docs/memory-analysis.md @@ -0,0 +1,258 @@ +# Comet Off-Heap Memory Analysis: Why Comet OOMs at TPC-DS 1TB + +## Problem + +Comet OOMs running TPC-DS at 1TB scale unless executor offHeap memory is >= 32GB. +Gluten (Velox backend) handles the same workload with ~2GB offHeap. + +## Root Cause: Untracked Memory Bypassing the Memory Pool + +With default settings, Spark uses SortMergeJoin (not HashJoin). Comet's hash join +replacement (`spark.comet.exec.replaceSortMergeJoin`) is experimental and disabled +by default. The operators in play — Sort, Aggregate, SortMergeJoin, Shuffle — all +have DataFusion-internal spill support that triggers on `try_grow()` failure. + +**The real problem is not missing spill support.** It's that the majority of memory +allocations bypass the pool entirely, making the pool's backpressure mechanism useless. + +### Untracked Memory Sources + +#### 1. Input Batches from JVM (scan.rs:136-260) — CRITICAL + +When `ScanExec` pulls Arrow batches from JVM via FFI, the resulting arrays are stored +in `self.batch: Arc>>` but **never registered with a +MemoryReservation**. At 1TB scale, this is the bulk of data flowing through the system. + +``` +JVM → FFI_ArrowArray → ArrayData::from_spark() → make_array() → Vec + ^^^ untracked +``` + +#### 2. FFI Array Deep Copies (copy.rs:34-93) + +`copy_array()` and `copy_or_unpack_array()` make deep copies without tracking: + +```rust +pub fn copy_array(array: &dyn Array) -> ArrayRef { + let mut mutable = MutableArrayData::new(vec![&data], false, capacity); + mutable.extend(0, 0, capacity); // allocation not tracked + make_array(mutable.freeze()) +} +``` + +When `arrow_ffi_safe=false`, each incoming batch is duplicated with zero accounting. + +#### 3. Selection Vector Processing (scan.rs:309-370) + +Selection vector filtering creates additional Arrow arrays via FFI + `take()` that +are never tracked. + +#### 4. Shuffle Decompression (shuffle_scan.rs:136-201) + +`read_ipc_compressed()` decompresses full batches without pool tracking. + +#### 5. Window Function Partitions (planner.rs — BoundedWindowAggExec) + +`BoundedWindowAggExec` buffers entire partitions in memory. Not spillable. + +### Why This Causes OOM + +The memory pool's backpressure mechanism works correctly: when `acquire_from_spark()` +returns less than requested (`unified_pool.rs:121-151`), it returns +`ResourcesExhausted`, and Sort/Aggregate catch this and spill. + +But the pool doesn't know about ~80% of actual memory usage. Untracked scan batches, +copies, and decompression buffers consume physical memory invisibly. The pool grants +allocations to Sort/Aggregate while the process is already near OOM from untracked data. + +This explains the 32GB requirement: you need enough headroom for all untracked memory +on top of the tracked pool. + +### Spark spill() Callback is Also a No-Op + +`spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:99-113`: + +```java +private class NativeMemoryConsumer extends MemoryConsumer { + public long spill(long size, MemoryConsumer trigger) { + return 0; // No spilling + } +} +``` + +Even if memory were tracked, Spark can't reclaim it. But fixing the spill callback +alone wouldn't help without fixing the tracking gaps first. + +### Per-Operator Status (Default Config) + +| Operator | Spillable? | Memory Tracked? | Notes | +|----------|-----------|----------------|-------| +| SortExec | Yes (DF internal) | Yes | Spills on try_grow() failure | +| AggregateExec | Yes (DF internal) | Yes | Spills on try_grow() failure | +| SortMergeJoinExec | Streams | Yes | Low memory — relies on sorted inputs | +| Shuffle writer | Yes | Yes | Only operator with `with_can_spill(true)` | +| ScanExec (FFI batches) | No | **No** | Primary untracked memory source | +| copy_array / unpack | No | **No** | Doubles batch memory untracked | +| ShuffleScanExec | No | **No** | Decompression buffers untracked | +| BoundedWindowAggExec | No | Partial | Entire partitions buffered | +| BroadcastHashJoin | N/A | No (JVM side) | Build side in JVM heap | + +## Comparison with Gluten + +| Capability | Comet | Gluten | +|-----------|-------|--------| +| Spark spill callback | Returns 0 (no-op) | Hierarchical spill cascade via TreeMemoryConsumer | +| Hash join spill | None | Velox spills hash tables to disk | +| Per-operator tracking | Single NativeMemoryConsumer | Per-operator MemoryTarget children | +| Retry on OOM | None | Multi-retry with exponential backoff + GC | +| Memory isolation | Proportional per-task limit | Hard per-task cap option | + +### Gluten's TreeMemoryConsumer Architecture + +``` +TreeMemoryConsumer (registered as Spark MemoryConsumer) + +-- Child: HashJoin (with Spiller) + +-- Child: HashAggregate (with Spiller) + +-- Child: Shuffle (with Spiller) + +-- Child: Sort (with Spiller) +``` + +When Spark calls `spill()`, Gluten: +1. Walks children sorted by usage (largest first) +2. Calls SHRINK phase (reduce internal buffers) +3. Calls SPILL phase (write to disk) +4. Returns actual bytes freed + +### Gluten's Retry-on-OOM + +`RetryOnOomMemoryTarget` catches allocation failures, triggers a spill cascade +across all operators in the task, then retries. `ThrowOnOomMemoryTarget` wraps +everything with up to 9 retries with exponential backoff. + +## Debugging Steps + +### 1. Enable memory debug logging +``` +spark.comet.debug.memory.enabled=true +``` + +### 2. Run targeted TPC-DS queries individually +- Join-heavy (likely OOM first): q5, q14a, q14b, q23a, q23b, q24a, q24b, q72, q95 +- Sort/aggregate-heavy: q1, q4, q11, q74 + +### 3. Disable operator categories to isolate +``` +spark.comet.exec.hashJoin.enabled=false +spark.comet.exec.sortMergeJoin.enabled=false +# or +spark.comet.exec.shuffle.enabled=false +``` + +### 4. Check TrackConsumersPool output on OOM +The error message should list top 10 memory consumers by usage. + +--- + +## Solution Design + +### The Real Fix: Track All Memory Through the Pool + +DataFusion's spill model already works — Sort and Aggregate react to +`ResourcesExhausted` by spilling. The fix is to make the pool aware of all +memory so backpressure triggers at the right time. + +### Priority 1: Track ScanExec Input Batches (Highest Impact) + +**Location:** `native/core/src/execution/operators/scan.rs` + +ScanExec should hold a `MemoryReservation` and call `try_grow()` for each batch +received from the JVM. If the pool denies the allocation, downstream operators +will already be under spill pressure. + +```rust +// Proposed change in ScanExec +struct ScanExec { + // ... existing fields ... + reservation: MemoryReservation, // NEW: track batch memory +} + +impl ScanExec { + fn get_next(&mut self) -> Result { + let batch = self.import_from_jvm()?; + let batch_size = batch.get_array_memory_size(); + self.reservation.try_grow(batch_size)?; // Account for it + // ... store batch ... + } +} +``` + +When the batch is consumed/dropped, `reservation.shrink()` releases the memory. + +This single change would make the pool aware of the largest untracked memory +category and allow backpressure to trigger spill in downstream Sort/Aggregate. + +### Priority 2: Track Array Copies (copy.rs) + +`copy_array()` and `copy_or_unpack_array()` should accept a `&MemoryReservation` +and call `try_grow()` before allocating the copy. This requires threading the +reservation through from the calling ScanExec. + +### Priority 3: Track Shuffle Decompression (shuffle_scan.rs) + +`ShuffleScanExec` should hold a `MemoryReservation` and track decompressed batch +sizes, similar to the ScanExec fix. + +### Priority 4: Implement spill() Callback + +After memory tracking is in place, implement a real `spill()` in +`NativeMemoryConsumer` using the shrink-the-pool approach: + +1. When Spark calls `spill(size)`, JNI into native to set a `spill_pressure` atomic +2. Pool's `try_grow()` checks `spill_pressure` and returns `ResourcesExhausted` +3. Sort/Aggregate operators react by spilling internally +4. As operators `shrink()` their reservations, decrement `spill_pressure` +5. Return actual bytes freed to Spark + +This is secondary because fixing tracking (Priorities 1-3) means the pool will +naturally deny allocations when physical memory is scarce, triggering DataFusion's +existing spill without needing Spark coordination. + +### Priority 5: Window Function Spill + +`BoundedWindowAggExec` buffers entire partitions. For large partitions at 1TB, +this needs either: +- Use DataFusion's spillable window variant if available +- Fall back to Spark's window operator above a threshold + +### Comparison with Gluten's Approach + +Gluten solves this differently because Velox owns the entire execution pipeline +including scans. All memory flows through Velox's allocator, which calls back to +Java's `ReservationListener` on every allocation. The `TreeMemoryConsumer` hierarchy +then coordinates spill across operators. + +Comet's challenge is the FFI boundary — Arrow arrays arrive from JVM as raw pointers +with no allocator integration. The arrays are allocated by the JVM's Arrow library, +and Comet's native side just maps them. Tracking must be explicitly added. + +### Implementation Phases + +**Phase 1: Measurement** +- Enable `spark.comet.debug.memory.enabled=true` +- Run TPC-DS 1TB, identify which queries OOM first +- Measure ratio of tracked vs untracked memory + +**Phase 2: Track ScanExec batches (highest impact)** +- Add MemoryReservation to ScanExec +- Track batch sizes on import, release on consumption +- Re-test TPC-DS 1TB — this alone may dramatically reduce memory requirement + +**Phase 3: Track remaining gaps** +- copy.rs array copies +- shuffle_scan.rs decompression buffers +- Selection vector processing + +**Phase 4: Spill callback (if needed)** +- Implement shrink-the-pool mechanism in NativeMemoryConsumer +- Wire through JNI to native pool +- Allows Spark to proactively reclaim memory from native operators From d8ea0acdc40e0b95f2122e03dd3a1c7518cc10d5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 06:07:42 -0600 Subject: [PATCH 02/12] docs: move memory analysis into contributor guide --- docs/source/contributor-guide/index.md | 1 + .../contributor-guide/memory-management.md} | 21 ++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) rename docs/{memory-analysis.md => source/contributor-guide/memory-management.md} (92%) diff --git a/docs/source/contributor-guide/index.md b/docs/source/contributor-guide/index.md index 0e46f5c821..489d2b3af7 100644 --- a/docs/source/contributor-guide/index.md +++ b/docs/source/contributor-guide/index.md @@ -36,6 +36,7 @@ Adding a New Operator Adding a New Expression Tracing Profiling +Memory Management Spark SQL Tests Iceberg Spark Tests SQL File Tests diff --git a/docs/memory-analysis.md b/docs/source/contributor-guide/memory-management.md similarity index 92% rename from docs/memory-analysis.md rename to docs/source/contributor-guide/memory-management.md index 2247d95e63..9eaf472d5e 100644 --- a/docs/memory-analysis.md +++ b/docs/source/contributor-guide/memory-management.md @@ -1,4 +1,23 @@ -# Comet Off-Heap Memory Analysis: Why Comet OOMs at TPC-DS 1TB + + +# Memory Management ## Problem From 73eaf85319755a42f8a8f8ac1a37db4551a2fb9c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 06:54:30 -0600 Subject: [PATCH 03/12] docs: add benchmark results and fix memory-profile.sh time parsing --- benchmarks/tpc/memory-profile.sh | 15 ++---- .../contributor-guide/memory-management.md | 46 +++++++++++++++++-- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/benchmarks/tpc/memory-profile.sh b/benchmarks/tpc/memory-profile.sh index 461fa65a9a..be088f8e0e 100755 --- a/benchmarks/tpc/memory-profile.sh +++ b/benchmarks/tpc/memory-profile.sh @@ -133,7 +133,7 @@ run_query() { /usr/bin/time -l "$SPARK_HOME/bin/spark-submit" "${conf[@]}" \ > "$log_file" 2> "$time_file" || exit_code=$? - # Parse peak RSS from /usr/bin/time -l output (macOS format) + # Parse peak RSS from /usr/bin/time -l output (macOS format: bytes) local peak_rss_bytes peak_rss_bytes=$(grep "maximum resident set size" "$time_file" | awk '{print $1}') || true local peak_rss_mb="N/A" @@ -141,17 +141,12 @@ run_query() { peak_rss_mb=$((peak_rss_bytes / 1048576)) fi - # Parse wall clock time - local wall_time - wall_time=$(grep "elapsed" "$time_file" | head -1 | awk '{print $1}') || true - # Convert MM:SS.xx or H:MM:SS.xx to seconds + # Parse wall clock time (macOS format: " 89.94 real 363.21 user 10.80 sys") local wall_secs="N/A" + local wall_time + wall_time=$(grep "real" "$time_file" | tail -1 | awk '{print $1}') || true if [ -n "$wall_time" ]; then - wall_secs=$(echo "$wall_time" | awk -F: '{ - if (NF==3) print $1*3600 + $2*60 + $3; - else if (NF==2) print $1*60 + $2; - else print $1 - }') + wall_secs="$wall_time" fi if [ "$exit_code" -eq 0 ]; then diff --git a/docs/source/contributor-guide/memory-management.md b/docs/source/contributor-guide/memory-management.md index 9eaf472d5e..e1fa1f1ed6 100644 --- a/docs/source/contributor-guide/memory-management.md +++ b/docs/source/contributor-guide/memory-management.md @@ -148,6 +148,43 @@ When Spark calls `spill()`, Gluten: across all operators in the task, then retries. `ThrowOnOomMemoryTarget` wraps everything with up to 9 retries with exponential backoff. +## Benchmark Results: TPC-H SF100, local[4], Peak RSS + +Measured with `benchmarks/tpc/memory-profile.sh` using `/usr/bin/time -l` on macOS +(96GB RAM, 28 cores, Spark 3.5.8, no container memory limits). + +| Config | Q1 (aggregation) | Q5 (5-way join) | Q9 (6-way join) | +|--------|-----------------|-----------------|-----------------| +| Spark 4g offHeap | 2700 MB | 5167 MB | 4580 MB | +| Comet 4g offHeap | 679 MB | 5534 MB | 5911 MB | +| Comet 8g offHeap | 665 MB | 5440 MB | 6359 MB | + +### Analysis + +**Aggregation (Q1)**: Comet uses 75% less memory than Spark. No memory concern. + +**Join-heavy queries (Q5, Q9)**: Comet uses more memory than Spark. +- Q5: ~370 MB over Spark, flat across offHeap sizes (fixed overhead) +- Q9: 1331 MB over Spark at 4g, grows to 1779 MB at 8g (elastic — expands with pool) + +**Q9's elastic growth** (450 MB increase from 4g→8g) points to operators that buffer +greedily up to the pool limit. The shuffle writer is designed this way — it accumulates +batches in `buffered_batches` until `try_grow()` fails, then spills +(`multi_partition.rs:395-435`). With more offHeap, it buffers more before spilling. + +**Cross-task eviction**: With `spill()` returning 0, Spark cannot reclaim memory from +one Comet task to give to another. In a container with 16 tasks sharing 16GB, each task's +shuffle writer competes for the shared pool, and no task can force another to spill. + +### Key Insight + +The untracked memory sources (ScanExec FFI batches, array copies) are likely secondary. +The primary memory issue is **elastic buffering combined with broken cross-task eviction**: +1. Shuffle writer greedily fills available pool space +2. Multiple concurrent tasks each do this +3. Spark cannot reclaim from any of them (spill returns 0) +4. In a constrained container, this leads to OOM + ## Debugging Steps ### 1. Enable memory debug logging @@ -155,13 +192,14 @@ everything with up to 9 retries with exponential backoff. spark.comet.debug.memory.enabled=true ``` -### 2. Run targeted TPC-DS queries individually -- Join-heavy (likely OOM first): q5, q14a, q14b, q23a, q23b, q24a, q24b, q72, q95 -- Sort/aggregate-heavy: q1, q4, q11, q74 +### 2. Run memory profiling script +```bash +cd benchmarks/tpc +./memory-profile.sh --queries "1 5 9 21" --offheap-sizes "4g 8g 16g" --cores 4 +``` ### 3. Disable operator categories to isolate ``` -spark.comet.exec.hashJoin.enabled=false spark.comet.exec.sortMergeJoin.enabled=false # or spark.comet.exec.shuffle.enabled=false From b54a0cec3a12d36b99f7d3676a8573ffe1a19213 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 07:02:38 -0600 Subject: [PATCH 04/12] docs: add shuffle isolation results and PR 3845 notes --- .../contributor-guide/memory-management.md | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/docs/source/contributor-guide/memory-management.md b/docs/source/contributor-guide/memory-management.md index e1fa1f1ed6..bcadf45183 100644 --- a/docs/source/contributor-guide/memory-management.md +++ b/docs/source/contributor-guide/memory-management.md @@ -176,6 +176,39 @@ batches in `buffered_batches` until `try_grow()` fails, then spills one Comet task to give to another. In a container with 16 tasks sharing 16GB, each task's shuffle writer competes for the shared pool, and no task can force another to spill. +### Isolating Shuffle vs Non-Shuffle Overhead (Q9) + +Running Q9 with `spark.comet.exec.shuffle.enabled=false` (Spark handles shuffles, +Comet handles sort/SMJ/aggregate): + +| Config | Q9 Peak RSS | +|--------|-------------| +| Spark only | 4580 MB | +| Comet, shuffle disabled | 5035 MB | +| Comet, full (4g offHeap) | 5911 MB | +| Comet, full (8g offHeap) | 6359 MB | + +- **Non-shuffle overhead** (sort + SMJ + agg + scan): +455 MB fixed +- **Shuffle overhead**: +876 MB at 4g, grows with offHeap size +- **Shuffle elastic growth** (4g→8g): +448 MB + +Shuffle accounts for ~2/3 of the extra memory over Spark. This is also the component +that grows with offHeap size due to greedy buffering. + +### New Shuffle Writer (PR #3845) + +[PR #3845](https://github.com/apache/datafusion-comet/pull/3845) introduces an +"immediate mode" shuffle writer that buffers **compressed IPC-encoded blocks** per +partition instead of raw `RecordBatch` objects. Key differences: + +- **Eager encoding**: Data is IPC-encoded and compressed as it arrives, per partition. + Memory footprint is bounded by `num_partitions * batch_size`, independent of input size. +- **Current writer**: Buffers uncompressed `RecordBatch` objects in a `Vec`, with memory + proportional to total input data. Delays all encoding until spill or final write. +- **Expected reduction**: 30-50% peak memory reduction in benchmarks. +- **Still greedy**: The new writer still buffers until `try_grow()` fails, but the + compressed representation means each buffer unit is much smaller. + ### Key Insight The untracked memory sources (ScanExec FFI batches, array copies) are likely secondary. @@ -185,6 +218,9 @@ The primary memory issue is **elastic buffering combined with broken cross-task 3. Spark cannot reclaim from any of them (spill returns 0) 4. In a constrained container, this leads to OOM +The new shuffle writer (#3845) will help by reducing per-buffer memory, but the +cross-task eviction problem remains. + ## Debugging Steps ### 1. Enable memory debug logging From db233e0962c035b177cfb05da73ff9ec254ad91c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 07:56:52 -0600 Subject: [PATCH 05/12] feat: add SpillState for cross-thread spill coordination --- native/core/src/execution/memory_pools/mod.rs | 1 + .../core/src/execution/memory_pools/spill.rs | 94 +++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 native/core/src/execution/memory_pools/spill.rs diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index d8b3473353..c97a60c29a 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -20,6 +20,7 @@ mod fair_pool; pub mod logging_pool; mod task_shared; mod unified_pool; +pub(crate) mod spill; use datafusion::execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool, diff --git a/native/core/src/execution/memory_pools/spill.rs b/native/core/src/execution/memory_pools/spill.rs new file mode 100644 index 0000000000..df53ce58fc --- /dev/null +++ b/native/core/src/execution/memory_pools/spill.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Condvar, Mutex}; +use std::time::Duration; + +/// Shared state for coordinating spill requests between Spark's memory manager +/// (which calls `NativeMemoryConsumer.spill()` on a Spark thread) and DataFusion +/// operators (which call `try_grow()`/`shrink()` on tokio threads). +/// +/// When Spark needs to reclaim memory from Comet, it sets `pressure` via +/// `request_spill()`. The memory pool's `try_grow()` checks this and returns +/// `ResourcesExhausted`, causing operators to spill. As operators call `shrink()`, +/// freed bytes are accumulated and the waiting Spark thread is notified. +#[derive(Debug)] +pub struct SpillState { + /// Bytes requested to be freed. Set by Spark's spill() callback. + pressure: AtomicUsize, + /// Bytes actually freed since pressure was set. + freed: AtomicUsize, + /// Mutex + Condvar to allow the spill requester to wait for operators to react. + notify: (Mutex<()>, Condvar), +} + +impl SpillState { + pub fn new() -> Self { + Self { + pressure: AtomicUsize::new(0), + freed: AtomicUsize::new(0), + notify: (Mutex::new(()), Condvar::new()), + } + } + + /// Returns the current spill pressure in bytes. Called by the memory pool's + /// `try_grow()` to decide whether to deny allocations. + pub fn pressure(&self) -> usize { + self.pressure.load(Ordering::Acquire) + } + + /// Record that `size` bytes were freed (called from pool's `shrink()`). + /// Wakes the waiting spill requester. + pub fn record_freed(&self, size: usize) { + self.freed.fetch_add(size, Ordering::Release); + let (_lock, cvar) = &self.notify; + cvar.notify_all(); + } + + /// Called from JNI when Spark's `NativeMemoryConsumer.spill()` is invoked. + /// Sets spill pressure and waits (up to `timeout`) for operators to free memory. + /// Returns the actual number of bytes freed. + pub fn request_spill(&self, size: usize, timeout: Duration) -> usize { + // Reset freed counter and set pressure + self.freed.store(0, Ordering::Release); + self.pressure.store(size, Ordering::Release); + + // Wait for operators to react + let (lock, cvar) = &self.notify; + let mut guard = lock.lock().unwrap(); + let deadline = std::time::Instant::now() + timeout; + while self.freed.load(Ordering::Acquire) < size { + let remaining = deadline.saturating_duration_since(std::time::Instant::now()); + if remaining.is_zero() { + break; + } + let (new_guard, _timeout_result) = cvar.wait_timeout(guard, remaining).unwrap(); + guard = new_guard; + } + + // Clear pressure and return freed bytes + self.pressure.store(0, Ordering::Release); + self.freed.load(Ordering::Acquire) + } +} + +impl Default for SpillState { + fn default() -> Self { + Self::new() + } +} From 8d4abe2ca18b861ff8d240de57226dad7c646314 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 07:58:19 -0600 Subject: [PATCH 06/12] feat: wire SpillState into CometUnifiedMemoryPool --- native/core/src/execution/jni_api.rs | 2 +- native/core/src/execution/memory_pools/mod.rs | 11 ++++++++--- .../src/execution/memory_pools/unified_pool.rs | 16 ++++++++++++++++ 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index e0a395ebbf..bfc5b609ec 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -258,7 +258,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_limit_per_task, )?; let memory_pool = - create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id); + create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id, None); let memory_pool = if logging_memory_pool { Arc::new(LoggingMemoryPool::new(task_attempt_id as u64, memory_pool)) diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index c97a60c29a..72e583f130 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -28,6 +28,7 @@ use datafusion::execution::memory_pool::{ use fair_pool::CometFairMemoryPool; use jni::objects::GlobalRef; use once_cell::sync::OnceCell; +use spill::SpillState; use std::num::NonZeroUsize; use std::sync::Arc; use unified_pool::CometUnifiedMemoryPool; @@ -39,13 +40,17 @@ pub(crate) fn create_memory_pool( memory_pool_config: &MemoryPoolConfig, comet_task_memory_manager: Arc, task_attempt_id: i64, + spill_state: Option>, ) -> Arc { const NUM_TRACKED_CONSUMERS: usize = 10; match memory_pool_config.pool_type { MemoryPoolType::GreedyUnified => { - // Set Comet memory pool for native - let memory_pool = - CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id); + let spill_state = spill_state.unwrap_or_else(|| Arc::new(SpillState::new())); + let memory_pool = CometUnifiedMemoryPool::new( + comet_task_memory_manager, + task_attempt_id, + spill_state, + ); Arc::new(TrackConsumersPool::new( memory_pool, NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), diff --git a/native/core/src/execution/memory_pools/unified_pool.rs b/native/core/src/execution/memory_pools/unified_pool.rs index 3233dd6d40..a5e7a425de 100644 --- a/native/core/src/execution/memory_pools/unified_pool.rs +++ b/native/core/src/execution/memory_pools/unified_pool.rs @@ -23,6 +23,7 @@ use std::{ }, }; +use super::spill::SpillState; use crate::{errors::CometResult, jvm_bridge::JVMClasses}; use datafusion::{ common::{resources_datafusion_err, DataFusionError}, @@ -38,12 +39,14 @@ pub struct CometUnifiedMemoryPool { task_memory_manager_handle: Arc, used: AtomicUsize, task_attempt_id: i64, + spill_state: Arc, } impl Debug for CometUnifiedMemoryPool { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { f.debug_struct("CometUnifiedMemoryPool") .field("used", &self.used.load(Relaxed)) + .field("spill_pressure", &self.spill_state.pressure()) .finish() } } @@ -52,11 +55,13 @@ impl CometUnifiedMemoryPool { pub fn new( task_memory_manager_handle: Arc, task_attempt_id: i64, + spill_state: Arc, ) -> CometUnifiedMemoryPool { Self { task_memory_manager_handle, task_attempt_id, used: AtomicUsize::new(0), + spill_state, } } @@ -116,10 +121,21 @@ impl MemoryPool for CometUnifiedMemoryPool { self.task_attempt_id ); } + if self.spill_state.pressure() > 0 { + self.spill_state.record_freed(size); + } } fn try_grow(&self, _: &MemoryReservation, additional: usize) -> Result<(), DataFusionError> { if additional > 0 { + if self.spill_state.pressure() > 0 { + return Err(resources_datafusion_err!( + "Task {} denied {} bytes due to spill pressure. Reserved: {}", + self.task_attempt_id, + additional, + self.reserved() + )); + } let acquired = self.acquire_from_spark(additional)?; // If the number of bytes we acquired is less than the requested, return an error, // and hopefully will trigger spilling from the caller side. From 9faa80a762f6fa87e481e8d3299b3e40597f7d25 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 08:00:24 -0600 Subject: [PATCH 07/12] feat: add JNI requestSpill function with SpillState in ExecutionContext --- native/core/src/execution/jni_api.rs | 47 ++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index bfc5b609ec..7c817b04de 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -79,8 +79,10 @@ use std::{sync::Arc, task::Poll}; use tokio::runtime::Runtime; use tokio::sync::mpsc; +use crate::execution::memory_pools::spill::SpillState; use crate::execution::memory_pools::{ - create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config, MemoryPoolConfig, + create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config, + MemoryPoolConfig, MemoryPoolType, }; use crate::execution::operators::{ScanExec, ShuffleScanExec}; use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec}; @@ -177,6 +179,8 @@ struct ExecutionContext { pub explain_native: bool, /// Memory pool config pub memory_pool_config: MemoryPoolConfig, + /// Spill state for coordinating spill requests between Spark and native operators + pub spill_state: Option>, /// Whether to log memory usage on each call to execute_plan pub tracing_enabled: bool, } @@ -257,8 +261,20 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_limit, memory_limit_per_task, )?; - let memory_pool = - create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id, None); + let spill_state = if off_heap_mode != JNI_FALSE + && matches!(memory_pool_config.pool_type, MemoryPoolType::GreedyUnified) + { + Some(Arc::new(SpillState::new())) + } else { + None + }; + + let memory_pool = create_memory_pool( + &memory_pool_config, + task_memory_manager, + task_attempt_id, + spill_state.clone(), + ); let memory_pool = if logging_memory_pool { Arc::new(LoggingMemoryPool::new(task_attempt_id as u64, memory_pool)) @@ -326,6 +342,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( debug_native, explain_native, memory_pool_config, + spill_state, tracing_enabled, }); @@ -706,6 +723,30 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan( }) } +/// Called from `CometTaskMemoryManager.spill()` via JNI to request that native +/// operators free memory. Returns the number of bytes actually freed. +/// # Safety +/// This function is inherently unsafe since it deals with raw pointers passed from JNI. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_comet_Native_requestSpill( + e: JNIEnv, + _class: JClass, + exec_context: jlong, + size: jlong, +) -> jlong { + try_unwrap_or_throw(&e, |_env| { + let exec_context = get_execution_context(exec_context); + if let Some(ref spill_state) = exec_context.spill_state { + let timeout = std::time::Duration::from_secs(10); + let freed = spill_state.request_spill(size as usize, timeout); + Ok(freed as jlong) + } else { + // No spill state (not using unified pool) — can't spill + Ok(0i64) + } + }) +} + /// Updates the metrics of the query plan. fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> CometResult<()> { if let Some(native_query) = &exec_context.root_op { From f82556c7459d4be328d867d6d4702ffb72dbb900 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 08:01:38 -0600 Subject: [PATCH 08/12] feat: implement spill() callback in CometTaskMemoryManager --- .../apache/spark/CometTaskMemoryManager.java | 44 ++++++++++++++++--- .../main/scala/org/apache/comet/Native.scala | 12 +++++ 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java index 3f344da68f..b2514ee37d 100644 --- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.comet.Native; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.MemoryMode; import org.apache.spark.memory.TaskMemoryManager; @@ -46,6 +47,12 @@ public class CometTaskMemoryManager { private final NativeMemoryConsumer nativeMemoryConsumer; private final AtomicLong used = new AtomicLong(); + /** + * The native ExecutionContext handle. Set after native plan creation. Used to route spill() + * requests to the native memory pool. + */ + private volatile long nativePlanHandle = 0; + public CometTaskMemoryManager(long id, long taskAttemptId) { this.id = id; this.taskAttemptId = taskAttemptId; @@ -53,6 +60,14 @@ public CometTaskMemoryManager(long id, long taskAttemptId) { this.nativeMemoryConsumer = new NativeMemoryConsumer(); } + /** + * Set the native plan handle after plan creation. This enables the spill() callback to route + * requests to the native memory pool. + */ + public void setNativePlanHandle(long handle) { + this.nativePlanHandle = handle; + } + // Called by Comet native through JNI. // Returns the actual amount of memory (in bytes) granted. public long acquireMemory(long size) { @@ -97,9 +112,10 @@ public long getUsed() { } /** - * A dummy memory consumer that does nothing when spilling. At the moment, Comet native doesn't - * share the same API as Spark and cannot trigger spill when acquire memory. Therefore, when - * acquiring memory from native or JVM, spilling can only be triggered from JVM operators. + * A memory consumer that routes spill requests to the native memory pool. When Spark's memory + * manager needs to reclaim memory (e.g., for another task), it calls spill() which signals the + * native pool to apply backpressure. DataFusion operators (Sort, Aggregate, Shuffle) react by + * spilling their internal state to disk. */ private class NativeMemoryConsumer extends MemoryConsumer { protected NativeMemoryConsumer() { @@ -108,13 +124,29 @@ protected NativeMemoryConsumer() { @Override public long spill(long size, MemoryConsumer trigger) throws IOException { - // No spilling - return 0; + long handle = nativePlanHandle; + if (handle == 0) { + // Native plan not yet created or already destroyed + return 0; + } + logger.info( + "Task {} received spill request for {} bytes, forwarding to native", + taskAttemptId, + size); + try { + long freed = new Native().requestSpill(handle, size); + logger.info("Task {} native spill freed {} bytes", taskAttemptId, freed); + return freed; + } catch (Exception e) { + logger.warn("Task {} native spill failed: {}", taskAttemptId, e.getMessage()); + return 0; + } } @Override public String toString() { - return String.format("NativeMemoryConsumer(id=%)", id); + return String.format( + "NativeMemoryConsumer(id=%d, taskAttemptId=%d)", id, taskAttemptId); } } } diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index f6800626d6..2054e33cb0 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -103,6 +103,18 @@ class Native extends NativeBase { */ @native def releasePlan(plan: Long): Unit + /** + * Request that native operators spill memory. Called from CometTaskMemoryManager.spill(). + * + * @param nativePlanHandle + * the native ExecutionContext pointer + * @param size + * bytes requested to free + * @return + * actual bytes freed + */ + @native def requestSpill(nativePlanHandle: Long, size: Long): Long + /** * Used by Comet shuffle external sorter to write sorted records to disk. * From 6a7fc43a260e94926b5da9dab4547f4f237d5e1e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 08:06:32 -0600 Subject: [PATCH 09/12] feat: wire native plan handle to CometTaskMemoryManager for spill routing --- .../java/org/apache/spark/CometTaskMemoryManager.java | 10 ++++------ .../scala/org/apache/comet/CometExecIterator.scala | 8 +++++++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java index b2514ee37d..454fe7d0cd 100644 --- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -25,11 +25,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.comet.Native; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.MemoryMode; import org.apache.spark.memory.TaskMemoryManager; +import org.apache.comet.Native; + /** * A adapter class that is used by Comet native to acquire & release memory through Spark's unified * memory manager. This assumes Spark's off-heap memory mode is enabled. @@ -130,9 +131,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { return 0; } logger.info( - "Task {} received spill request for {} bytes, forwarding to native", - taskAttemptId, - size); + "Task {} received spill request for {} bytes, forwarding to native", taskAttemptId, size); try { long freed = new Native().requestSpill(handle, size); logger.info("Task {} native spill freed {} bytes", taskAttemptId, freed); @@ -145,8 +144,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { @Override public String toString() { - return String.format( - "NativeMemoryConsumer(id=%d, taskAttemptId=%d)", id, taskAttemptId); + return String.format("NativeMemoryConsumer(id=%d, taskAttemptId=%d)", id, taskAttemptId); } } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index e198ac99ff..734ef240e7 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -110,7 +110,7 @@ class CometExecIterator( val memoryConfig = CometExecIterator.getMemoryConfig(conf) - nativeLib.createPlan( + val handle = nativeLib.createPlan( id, inputIterators, protobufQueryPlan, @@ -128,6 +128,11 @@ class CometExecIterator( taskAttemptId, taskCPUs, keyUnwrapper) + + // Enable spill callback by giving the memory manager a handle to the native plan + cometTaskMemoryManager.setNativePlanHandle(handle) + + handle } private var nextBatch: Option[ColumnarBatch] = None @@ -236,6 +241,7 @@ class CometExecIterator( } nativeUtil.close() shuffleBlockIterators.values.foreach(_.close()) + cometTaskMemoryManager.setNativePlanHandle(0) nativeLib.releasePlan(plan) if (tracingEnabled) { From c73cf734e1d5fc6340155d628270dc7e95f3b4fb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 08:59:45 -0600 Subject: [PATCH 10/12] docs: update memory analysis with spill callback results local[8] benchmark shows Comet memory usage now on par with Spark (8525 vs 8476 MB) and elastic growth eliminated. --- .../contributor-guide/memory-management.md | 95 ++++++++++--------- 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/docs/source/contributor-guide/memory-management.md b/docs/source/contributor-guide/memory-management.md index bcadf45183..54152dec85 100644 --- a/docs/source/contributor-guide/memory-management.md +++ b/docs/source/contributor-guide/memory-management.md @@ -87,20 +87,16 @@ allocations to Sort/Aggregate while the process is already near OOM from untrack This explains the 32GB requirement: you need enough headroom for all untracked memory on top of the tracked pool. -### Spark spill() Callback is Also a No-Op +### Spark spill() Callback -`spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:99-113`: +`NativeMemoryConsumer.spill()` now forwards spill requests to the native memory pool +via JNI. When Spark calls `spill(size)`, it sets a `SpillState.pressure` atomic in the +native pool. The pool's `try_grow()` checks this and returns `ResourcesExhausted`, +causing DataFusion's Sort/Aggregate/Shuffle operators to spill internally. Freed bytes +are tracked via `shrink()` and returned to Spark. -```java -private class NativeMemoryConsumer extends MemoryConsumer { - public long spill(long size, MemoryConsumer trigger) { - return 0; // No spilling - } -} -``` - -Even if memory were tracked, Spark can't reclaim it. But fixing the spill callback -alone wouldn't help without fixing the tracking gaps first. +See `SpillState` in `native/core/src/execution/memory_pools/spill.rs` and the +`requestSpill` JNI function in `native/core/src/execution/jni_api.rs`. ### Per-Operator Status (Default Config) @@ -120,8 +116,8 @@ alone wouldn't help without fixing the tracking gaps first. | Capability | Comet | Gluten | |-----------|-------|--------| -| Spark spill callback | Returns 0 (no-op) | Hierarchical spill cascade via TreeMemoryConsumer | -| Hash join spill | None | Velox spills hash tables to disk | +| Spark spill callback | Forwards to native via SpillState pressure | Hierarchical spill cascade via TreeMemoryConsumer | +| Hash join spill | None (experimental, disabled by default) | Velox spills hash tables to disk | | Per-operator tracking | Single NativeMemoryConsumer | Per-operator MemoryTarget children | | Retry on OOM | None | Multi-retry with exponential backoff + GC | | Memory isolation | Proportional per-task limit | Hard per-task cap option | @@ -209,17 +205,32 @@ partition instead of raw `RecordBatch` objects. Key differences: - **Still greedy**: The new writer still buffers until `try_grow()` fails, but the compressed representation means each buffer unit is much smaller. -### Key Insight +### Spill Callback Results + +After implementing the spill callback (`SpillState` + `NativeMemoryConsumer.spill()`), +Q9 was re-benchmarked: -The untracked memory sources (ScanExec FFI batches, array copies) are likely secondary. -The primary memory issue is **elastic buffering combined with broken cross-task eviction**: -1. Shuffle writer greedily fills available pool space -2. Multiple concurrent tasks each do this -3. Spark cannot reclaim from any of them (spill returns 0) -4. In a constrained container, this leads to OOM +**local[4] (4 concurrent tasks):** -The new shuffle writer (#3845) will help by reducing per-buffer memory, but the -cross-task eviction problem remains. +| Config | Q9 Before | Q9 After | Change | +|--------|-----------|----------|--------| +| Comet 4g | 5911 MB | 5896 MB | -15 MB | +| Comet 8g | 6359 MB | 6060 MB | -299 MB | +| 4g→8g delta | 448 MB | 164 MB | **-63% elastic growth** | + +**local[8] (8 concurrent tasks):** + +| Config | Q9 RSS | +|--------|--------| +| Spark 4g | 8476 MB | +| Comet 4g | 8525 MB | +| Comet 8g | 8448 MB | +| 4g→8g delta | -77 MB (within noise) | + +With 8 concurrent tasks, Comet's memory usage is **on par with Spark** (8525 vs 8476 MB) +and the elastic growth is effectively eliminated. The spill callback enables Spark to +apply backpressure to Comet's native memory pool, preventing shuffle writers from +greedily expanding when more offHeap is available. ## Debugging Steps @@ -295,20 +306,18 @@ reservation through from the calling ScanExec. `ShuffleScanExec` should hold a `MemoryReservation` and track decompressed batch sizes, similar to the ScanExec fix. -### Priority 4: Implement spill() Callback +### Priority 4: Implement spill() Callback — DONE -After memory tracking is in place, implement a real `spill()` in -`NativeMemoryConsumer` using the shrink-the-pool approach: +Implemented via `SpillState` + JNI `requestSpill`. The shrink-the-pool approach: -1. When Spark calls `spill(size)`, JNI into native to set a `spill_pressure` atomic -2. Pool's `try_grow()` checks `spill_pressure` and returns `ResourcesExhausted` -3. Sort/Aggregate operators react by spilling internally -4. As operators `shrink()` their reservations, decrement `spill_pressure` -5. Return actual bytes freed to Spark +1. When Spark calls `spill(size)`, JNI into native to set `SpillState.pressure` +2. Pool's `try_grow()` checks pressure and returns `ResourcesExhausted` +3. Sort/Aggregate/Shuffle operators react by spilling internally +4. As operators call `shrink()`, freed bytes are tracked and reported back +5. Actual bytes freed returned to Spark -This is secondary because fixing tracking (Priorities 1-3) means the pool will -naturally deny allocations when physical memory is scarce, triggering DataFusion's -existing spill without needing Spark coordination. +See `native/core/src/execution/memory_pools/spill.rs` and +`spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java`. ### Priority 5: Window Function Spill @@ -328,24 +337,16 @@ Comet's challenge is the FFI boundary — Arrow arrays arrive from JVM as raw po with no allocator integration. The arrays are allocated by the JVM's Arrow library, and Comet's native side just maps them. Tracking must be explicitly added. -### Implementation Phases - -**Phase 1: Measurement** -- Enable `spark.comet.debug.memory.enabled=true` -- Run TPC-DS 1TB, identify which queries OOM first -- Measure ratio of tracked vs untracked memory +### Remaining Work -**Phase 2: Track ScanExec batches (highest impact)** +**Phase 1: Track ScanExec batches** - Add MemoryReservation to ScanExec - Track batch sizes on import, release on consumption -- Re-test TPC-DS 1TB — this alone may dramatically reduce memory requirement -**Phase 3: Track remaining gaps** +**Phase 2: Track remaining gaps** - copy.rs array copies - shuffle_scan.rs decompression buffers - Selection vector processing -**Phase 4: Spill callback (if needed)** -- Implement shrink-the-pool mechanism in NativeMemoryConsumer -- Wire through JNI to native pool -- Allows Spark to proactively reclaim memory from native operators +**Phase 3: Extend spill callback to FairUnified pool** +- Same pattern as GreedyUnified — add SpillState to CometFairMemoryPool From d43e269a5f0586ee7ec4d78b3e1faaa24f4457b6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 16:57:29 -0600 Subject: [PATCH 11/12] chore: apply cargo fmt and prettier formatting --- .../contributor-guide/memory-management.md | 88 +++++++++++-------- native/core/src/execution/memory_pools/mod.rs | 2 +- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/docs/source/contributor-guide/memory-management.md b/docs/source/contributor-guide/memory-management.md index 54152dec85..2a5af6ad28 100644 --- a/docs/source/contributor-guide/memory-management.md +++ b/docs/source/contributor-guide/memory-management.md @@ -100,27 +100,27 @@ See `SpillState` in `native/core/src/execution/memory_pools/spill.rs` and the ### Per-Operator Status (Default Config) -| Operator | Spillable? | Memory Tracked? | Notes | -|----------|-----------|----------------|-------| -| SortExec | Yes (DF internal) | Yes | Spills on try_grow() failure | -| AggregateExec | Yes (DF internal) | Yes | Spills on try_grow() failure | -| SortMergeJoinExec | Streams | Yes | Low memory — relies on sorted inputs | -| Shuffle writer | Yes | Yes | Only operator with `with_can_spill(true)` | -| ScanExec (FFI batches) | No | **No** | Primary untracked memory source | -| copy_array / unpack | No | **No** | Doubles batch memory untracked | -| ShuffleScanExec | No | **No** | Decompression buffers untracked | -| BoundedWindowAggExec | No | Partial | Entire partitions buffered | -| BroadcastHashJoin | N/A | No (JVM side) | Build side in JVM heap | +| Operator | Spillable? | Memory Tracked? | Notes | +| ---------------------- | ----------------- | --------------- | ----------------------------------------- | +| SortExec | Yes (DF internal) | Yes | Spills on try_grow() failure | +| AggregateExec | Yes (DF internal) | Yes | Spills on try_grow() failure | +| SortMergeJoinExec | Streams | Yes | Low memory — relies on sorted inputs | +| Shuffle writer | Yes | Yes | Only operator with `with_can_spill(true)` | +| ScanExec (FFI batches) | No | **No** | Primary untracked memory source | +| copy_array / unpack | No | **No** | Doubles batch memory untracked | +| ShuffleScanExec | No | **No** | Decompression buffers untracked | +| BoundedWindowAggExec | No | Partial | Entire partitions buffered | +| BroadcastHashJoin | N/A | No (JVM side) | Build side in JVM heap | ## Comparison with Gluten -| Capability | Comet | Gluten | -|-----------|-------|--------| -| Spark spill callback | Forwards to native via SpillState pressure | Hierarchical spill cascade via TreeMemoryConsumer | -| Hash join spill | None (experimental, disabled by default) | Velox spills hash tables to disk | -| Per-operator tracking | Single NativeMemoryConsumer | Per-operator MemoryTarget children | -| Retry on OOM | None | Multi-retry with exponential backoff + GC | -| Memory isolation | Proportional per-task limit | Hard per-task cap option | +| Capability | Comet | Gluten | +| --------------------- | ------------------------------------------ | ------------------------------------------------- | +| Spark spill callback | Forwards to native via SpillState pressure | Hierarchical spill cascade via TreeMemoryConsumer | +| Hash join spill | None (experimental, disabled by default) | Velox spills hash tables to disk | +| Per-operator tracking | Single NativeMemoryConsumer | Per-operator MemoryTarget children | +| Retry on OOM | None | Multi-retry with exponential backoff + GC | +| Memory isolation | Proportional per-task limit | Hard per-task cap option | ### Gluten's TreeMemoryConsumer Architecture @@ -133,6 +133,7 @@ TreeMemoryConsumer (registered as Spark MemoryConsumer) ``` When Spark calls `spill()`, Gluten: + 1. Walks children sorted by usage (largest first) 2. Calls SHRINK phase (reduce internal buffers) 3. Calls SPILL phase (write to disk) @@ -149,17 +150,18 @@ everything with up to 9 retries with exponential backoff. Measured with `benchmarks/tpc/memory-profile.sh` using `/usr/bin/time -l` on macOS (96GB RAM, 28 cores, Spark 3.5.8, no container memory limits). -| Config | Q1 (aggregation) | Q5 (5-way join) | Q9 (6-way join) | -|--------|-----------------|-----------------|-----------------| -| Spark 4g offHeap | 2700 MB | 5167 MB | 4580 MB | -| Comet 4g offHeap | 679 MB | 5534 MB | 5911 MB | -| Comet 8g offHeap | 665 MB | 5440 MB | 6359 MB | +| Config | Q1 (aggregation) | Q5 (5-way join) | Q9 (6-way join) | +| ---------------- | ---------------- | --------------- | --------------- | +| Spark 4g offHeap | 2700 MB | 5167 MB | 4580 MB | +| Comet 4g offHeap | 679 MB | 5534 MB | 5911 MB | +| Comet 8g offHeap | 665 MB | 5440 MB | 6359 MB | ### Analysis **Aggregation (Q1)**: Comet uses 75% less memory than Spark. No memory concern. **Join-heavy queries (Q5, Q9)**: Comet uses more memory than Spark. + - Q5: ~370 MB over Spark, flat across offHeap sizes (fixed overhead) - Q9: 1331 MB over Spark at 4g, grows to 1779 MB at 8g (elastic — expands with pool) @@ -177,12 +179,12 @@ shuffle writer competes for the shared pool, and no task can force another to sp Running Q9 with `spark.comet.exec.shuffle.enabled=false` (Spark handles shuffles, Comet handles sort/SMJ/aggregate): -| Config | Q9 Peak RSS | -|--------|-------------| -| Spark only | 4580 MB | -| Comet, shuffle disabled | 5035 MB | -| Comet, full (4g offHeap) | 5911 MB | -| Comet, full (8g offHeap) | 6359 MB | +| Config | Q9 Peak RSS | +| ------------------------ | ----------- | +| Spark only | 4580 MB | +| Comet, shuffle disabled | 5035 MB | +| Comet, full (4g offHeap) | 5911 MB | +| Comet, full (8g offHeap) | 6359 MB | - **Non-shuffle overhead** (sort + SMJ + agg + scan): +455 MB fixed - **Shuffle overhead**: +876 MB at 4g, grows with offHeap size @@ -212,19 +214,19 @@ Q9 was re-benchmarked: **local[4] (4 concurrent tasks):** -| Config | Q9 Before | Q9 After | Change | -|--------|-----------|----------|--------| -| Comet 4g | 5911 MB | 5896 MB | -15 MB | -| Comet 8g | 6359 MB | 6060 MB | -299 MB | -| 4g→8g delta | 448 MB | 164 MB | **-63% elastic growth** | +| Config | Q9 Before | Q9 After | Change | +| ----------- | --------- | -------- | ----------------------- | +| Comet 4g | 5911 MB | 5896 MB | -15 MB | +| Comet 8g | 6359 MB | 6060 MB | -299 MB | +| 4g→8g delta | 448 MB | 164 MB | **-63% elastic growth** | **local[8] (8 concurrent tasks):** -| Config | Q9 RSS | -|--------|--------| -| Spark 4g | 8476 MB | -| Comet 4g | 8525 MB | -| Comet 8g | 8448 MB | +| Config | Q9 RSS | +| ----------- | --------------------- | +| Spark 4g | 8476 MB | +| Comet 4g | 8525 MB | +| Comet 8g | 8448 MB | | 4g→8g delta | -77 MB (within noise) | With 8 concurrent tasks, Comet's memory usage is **on par with Spark** (8525 vs 8476 MB) @@ -235,17 +237,20 @@ greedily expanding when more offHeap is available. ## Debugging Steps ### 1. Enable memory debug logging + ``` spark.comet.debug.memory.enabled=true ``` ### 2. Run memory profiling script + ```bash cd benchmarks/tpc ./memory-profile.sh --queries "1 5 9 21" --offheap-sizes "4g 8g 16g" --cores 4 ``` ### 3. Disable operator categories to isolate + ``` spark.comet.exec.sortMergeJoin.enabled=false # or @@ -253,6 +258,7 @@ spark.comet.exec.shuffle.enabled=false ``` ### 4. Check TrackConsumersPool output on OOM + The error message should list top 10 memory consumers by usage. --- @@ -323,6 +329,7 @@ See `native/core/src/execution/memory_pools/spill.rs` and `BoundedWindowAggExec` buffers entire partitions. For large partitions at 1TB, this needs either: + - Use DataFusion's spillable window variant if available - Fall back to Spark's window operator above a threshold @@ -340,13 +347,16 @@ and Comet's native side just maps them. Tracking must be explicitly added. ### Remaining Work **Phase 1: Track ScanExec batches** + - Add MemoryReservation to ScanExec - Track batch sizes on import, release on consumption **Phase 2: Track remaining gaps** + - copy.rs array copies - shuffle_scan.rs decompression buffers - Selection vector processing **Phase 3: Extend spill callback to FairUnified pool** + - Same pattern as GreedyUnified — add SpillState to CometFairMemoryPool diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index 72e583f130..748304a9ab 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -18,9 +18,9 @@ mod config; mod fair_pool; pub mod logging_pool; +pub(crate) mod spill; mod task_shared; mod unified_pool; -pub(crate) mod spill; use datafusion::execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool, From 221655a0de8d923e70cdad1fabf068fa09c6019e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 17:00:14 -0600 Subject: [PATCH 12/12] docs: remove standalone memory-management doc, add spill coordination to development guide --- docs/source/contributor-guide/development.md | 9 + docs/source/contributor-guide/index.md | 1 - .../contributor-guide/memory-management.md | 362 ------------------ 3 files changed, 9 insertions(+), 363 deletions(-) delete mode 100644 docs/source/contributor-guide/memory-management.md diff --git a/docs/source/contributor-guide/development.md b/docs/source/contributor-guide/development.md index 47f034c973..7873d04827 100644 --- a/docs/source/contributor-guide/development.md +++ b/docs/source/contributor-guide/development.md @@ -74,6 +74,15 @@ memory reservations. This happens on whatever thread the operator is executing o are thread-safe (they use stored `GlobalRef`s, not thread-locals), but they do trigger `AttachCurrentThread`. +**Spill coordination between Spark and DataFusion.** When Spark's memory manager needs to +reclaim memory, it calls `NativeMemoryConsumer.spill()` on a Spark thread. This routes through +JNI to `SpillState::request_spill()`, which sets a pressure value and blocks. On the native +side, `CometUnifiedMemoryPool::try_grow()` checks `SpillState::pressure()` and returns +`ResourcesExhausted` when pressure is set, causing DataFusion operators to spill. As operators +call `shrink()`, freed bytes are recorded in `SpillState` and the waiting Spark thread is +notified via a `Condvar`. This coordination crosses the JVM/tokio thread boundary — the +requesting thread is a Spark task thread while the responding threads are tokio workers. + **Scalar subqueries call into the JVM.** `Subquery::evaluate()` calls static methods on `CometScalarSubquery` via JNI. These use a static `HashMap`, not thread-locals, so they are safe from any thread. diff --git a/docs/source/contributor-guide/index.md b/docs/source/contributor-guide/index.md index 489d2b3af7..0e46f5c821 100644 --- a/docs/source/contributor-guide/index.md +++ b/docs/source/contributor-guide/index.md @@ -36,7 +36,6 @@ Adding a New Operator Adding a New Expression Tracing Profiling -Memory Management Spark SQL Tests Iceberg Spark Tests SQL File Tests diff --git a/docs/source/contributor-guide/memory-management.md b/docs/source/contributor-guide/memory-management.md deleted file mode 100644 index 2a5af6ad28..0000000000 --- a/docs/source/contributor-guide/memory-management.md +++ /dev/null @@ -1,362 +0,0 @@ - - -# Memory Management - -## Problem - -Comet OOMs running TPC-DS at 1TB scale unless executor offHeap memory is >= 32GB. -Gluten (Velox backend) handles the same workload with ~2GB offHeap. - -## Root Cause: Untracked Memory Bypassing the Memory Pool - -With default settings, Spark uses SortMergeJoin (not HashJoin). Comet's hash join -replacement (`spark.comet.exec.replaceSortMergeJoin`) is experimental and disabled -by default. The operators in play — Sort, Aggregate, SortMergeJoin, Shuffle — all -have DataFusion-internal spill support that triggers on `try_grow()` failure. - -**The real problem is not missing spill support.** It's that the majority of memory -allocations bypass the pool entirely, making the pool's backpressure mechanism useless. - -### Untracked Memory Sources - -#### 1. Input Batches from JVM (scan.rs:136-260) — CRITICAL - -When `ScanExec` pulls Arrow batches from JVM via FFI, the resulting arrays are stored -in `self.batch: Arc>>` but **never registered with a -MemoryReservation**. At 1TB scale, this is the bulk of data flowing through the system. - -``` -JVM → FFI_ArrowArray → ArrayData::from_spark() → make_array() → Vec - ^^^ untracked -``` - -#### 2. FFI Array Deep Copies (copy.rs:34-93) - -`copy_array()` and `copy_or_unpack_array()` make deep copies without tracking: - -```rust -pub fn copy_array(array: &dyn Array) -> ArrayRef { - let mut mutable = MutableArrayData::new(vec![&data], false, capacity); - mutable.extend(0, 0, capacity); // allocation not tracked - make_array(mutable.freeze()) -} -``` - -When `arrow_ffi_safe=false`, each incoming batch is duplicated with zero accounting. - -#### 3. Selection Vector Processing (scan.rs:309-370) - -Selection vector filtering creates additional Arrow arrays via FFI + `take()` that -are never tracked. - -#### 4. Shuffle Decompression (shuffle_scan.rs:136-201) - -`read_ipc_compressed()` decompresses full batches without pool tracking. - -#### 5. Window Function Partitions (planner.rs — BoundedWindowAggExec) - -`BoundedWindowAggExec` buffers entire partitions in memory. Not spillable. - -### Why This Causes OOM - -The memory pool's backpressure mechanism works correctly: when `acquire_from_spark()` -returns less than requested (`unified_pool.rs:121-151`), it returns -`ResourcesExhausted`, and Sort/Aggregate catch this and spill. - -But the pool doesn't know about ~80% of actual memory usage. Untracked scan batches, -copies, and decompression buffers consume physical memory invisibly. The pool grants -allocations to Sort/Aggregate while the process is already near OOM from untracked data. - -This explains the 32GB requirement: you need enough headroom for all untracked memory -on top of the tracked pool. - -### Spark spill() Callback - -`NativeMemoryConsumer.spill()` now forwards spill requests to the native memory pool -via JNI. When Spark calls `spill(size)`, it sets a `SpillState.pressure` atomic in the -native pool. The pool's `try_grow()` checks this and returns `ResourcesExhausted`, -causing DataFusion's Sort/Aggregate/Shuffle operators to spill internally. Freed bytes -are tracked via `shrink()` and returned to Spark. - -See `SpillState` in `native/core/src/execution/memory_pools/spill.rs` and the -`requestSpill` JNI function in `native/core/src/execution/jni_api.rs`. - -### Per-Operator Status (Default Config) - -| Operator | Spillable? | Memory Tracked? | Notes | -| ---------------------- | ----------------- | --------------- | ----------------------------------------- | -| SortExec | Yes (DF internal) | Yes | Spills on try_grow() failure | -| AggregateExec | Yes (DF internal) | Yes | Spills on try_grow() failure | -| SortMergeJoinExec | Streams | Yes | Low memory — relies on sorted inputs | -| Shuffle writer | Yes | Yes | Only operator with `with_can_spill(true)` | -| ScanExec (FFI batches) | No | **No** | Primary untracked memory source | -| copy_array / unpack | No | **No** | Doubles batch memory untracked | -| ShuffleScanExec | No | **No** | Decompression buffers untracked | -| BoundedWindowAggExec | No | Partial | Entire partitions buffered | -| BroadcastHashJoin | N/A | No (JVM side) | Build side in JVM heap | - -## Comparison with Gluten - -| Capability | Comet | Gluten | -| --------------------- | ------------------------------------------ | ------------------------------------------------- | -| Spark spill callback | Forwards to native via SpillState pressure | Hierarchical spill cascade via TreeMemoryConsumer | -| Hash join spill | None (experimental, disabled by default) | Velox spills hash tables to disk | -| Per-operator tracking | Single NativeMemoryConsumer | Per-operator MemoryTarget children | -| Retry on OOM | None | Multi-retry with exponential backoff + GC | -| Memory isolation | Proportional per-task limit | Hard per-task cap option | - -### Gluten's TreeMemoryConsumer Architecture - -``` -TreeMemoryConsumer (registered as Spark MemoryConsumer) - +-- Child: HashJoin (with Spiller) - +-- Child: HashAggregate (with Spiller) - +-- Child: Shuffle (with Spiller) - +-- Child: Sort (with Spiller) -``` - -When Spark calls `spill()`, Gluten: - -1. Walks children sorted by usage (largest first) -2. Calls SHRINK phase (reduce internal buffers) -3. Calls SPILL phase (write to disk) -4. Returns actual bytes freed - -### Gluten's Retry-on-OOM - -`RetryOnOomMemoryTarget` catches allocation failures, triggers a spill cascade -across all operators in the task, then retries. `ThrowOnOomMemoryTarget` wraps -everything with up to 9 retries with exponential backoff. - -## Benchmark Results: TPC-H SF100, local[4], Peak RSS - -Measured with `benchmarks/tpc/memory-profile.sh` using `/usr/bin/time -l` on macOS -(96GB RAM, 28 cores, Spark 3.5.8, no container memory limits). - -| Config | Q1 (aggregation) | Q5 (5-way join) | Q9 (6-way join) | -| ---------------- | ---------------- | --------------- | --------------- | -| Spark 4g offHeap | 2700 MB | 5167 MB | 4580 MB | -| Comet 4g offHeap | 679 MB | 5534 MB | 5911 MB | -| Comet 8g offHeap | 665 MB | 5440 MB | 6359 MB | - -### Analysis - -**Aggregation (Q1)**: Comet uses 75% less memory than Spark. No memory concern. - -**Join-heavy queries (Q5, Q9)**: Comet uses more memory than Spark. - -- Q5: ~370 MB over Spark, flat across offHeap sizes (fixed overhead) -- Q9: 1331 MB over Spark at 4g, grows to 1779 MB at 8g (elastic — expands with pool) - -**Q9's elastic growth** (450 MB increase from 4g→8g) points to operators that buffer -greedily up to the pool limit. The shuffle writer is designed this way — it accumulates -batches in `buffered_batches` until `try_grow()` fails, then spills -(`multi_partition.rs:395-435`). With more offHeap, it buffers more before spilling. - -**Cross-task eviction**: With `spill()` returning 0, Spark cannot reclaim memory from -one Comet task to give to another. In a container with 16 tasks sharing 16GB, each task's -shuffle writer competes for the shared pool, and no task can force another to spill. - -### Isolating Shuffle vs Non-Shuffle Overhead (Q9) - -Running Q9 with `spark.comet.exec.shuffle.enabled=false` (Spark handles shuffles, -Comet handles sort/SMJ/aggregate): - -| Config | Q9 Peak RSS | -| ------------------------ | ----------- | -| Spark only | 4580 MB | -| Comet, shuffle disabled | 5035 MB | -| Comet, full (4g offHeap) | 5911 MB | -| Comet, full (8g offHeap) | 6359 MB | - -- **Non-shuffle overhead** (sort + SMJ + agg + scan): +455 MB fixed -- **Shuffle overhead**: +876 MB at 4g, grows with offHeap size -- **Shuffle elastic growth** (4g→8g): +448 MB - -Shuffle accounts for ~2/3 of the extra memory over Spark. This is also the component -that grows with offHeap size due to greedy buffering. - -### New Shuffle Writer (PR #3845) - -[PR #3845](https://github.com/apache/datafusion-comet/pull/3845) introduces an -"immediate mode" shuffle writer that buffers **compressed IPC-encoded blocks** per -partition instead of raw `RecordBatch` objects. Key differences: - -- **Eager encoding**: Data is IPC-encoded and compressed as it arrives, per partition. - Memory footprint is bounded by `num_partitions * batch_size`, independent of input size. -- **Current writer**: Buffers uncompressed `RecordBatch` objects in a `Vec`, with memory - proportional to total input data. Delays all encoding until spill or final write. -- **Expected reduction**: 30-50% peak memory reduction in benchmarks. -- **Still greedy**: The new writer still buffers until `try_grow()` fails, but the - compressed representation means each buffer unit is much smaller. - -### Spill Callback Results - -After implementing the spill callback (`SpillState` + `NativeMemoryConsumer.spill()`), -Q9 was re-benchmarked: - -**local[4] (4 concurrent tasks):** - -| Config | Q9 Before | Q9 After | Change | -| ----------- | --------- | -------- | ----------------------- | -| Comet 4g | 5911 MB | 5896 MB | -15 MB | -| Comet 8g | 6359 MB | 6060 MB | -299 MB | -| 4g→8g delta | 448 MB | 164 MB | **-63% elastic growth** | - -**local[8] (8 concurrent tasks):** - -| Config | Q9 RSS | -| ----------- | --------------------- | -| Spark 4g | 8476 MB | -| Comet 4g | 8525 MB | -| Comet 8g | 8448 MB | -| 4g→8g delta | -77 MB (within noise) | - -With 8 concurrent tasks, Comet's memory usage is **on par with Spark** (8525 vs 8476 MB) -and the elastic growth is effectively eliminated. The spill callback enables Spark to -apply backpressure to Comet's native memory pool, preventing shuffle writers from -greedily expanding when more offHeap is available. - -## Debugging Steps - -### 1. Enable memory debug logging - -``` -spark.comet.debug.memory.enabled=true -``` - -### 2. Run memory profiling script - -```bash -cd benchmarks/tpc -./memory-profile.sh --queries "1 5 9 21" --offheap-sizes "4g 8g 16g" --cores 4 -``` - -### 3. Disable operator categories to isolate - -``` -spark.comet.exec.sortMergeJoin.enabled=false -# or -spark.comet.exec.shuffle.enabled=false -``` - -### 4. Check TrackConsumersPool output on OOM - -The error message should list top 10 memory consumers by usage. - ---- - -## Solution Design - -### The Real Fix: Track All Memory Through the Pool - -DataFusion's spill model already works — Sort and Aggregate react to -`ResourcesExhausted` by spilling. The fix is to make the pool aware of all -memory so backpressure triggers at the right time. - -### Priority 1: Track ScanExec Input Batches (Highest Impact) - -**Location:** `native/core/src/execution/operators/scan.rs` - -ScanExec should hold a `MemoryReservation` and call `try_grow()` for each batch -received from the JVM. If the pool denies the allocation, downstream operators -will already be under spill pressure. - -```rust -// Proposed change in ScanExec -struct ScanExec { - // ... existing fields ... - reservation: MemoryReservation, // NEW: track batch memory -} - -impl ScanExec { - fn get_next(&mut self) -> Result { - let batch = self.import_from_jvm()?; - let batch_size = batch.get_array_memory_size(); - self.reservation.try_grow(batch_size)?; // Account for it - // ... store batch ... - } -} -``` - -When the batch is consumed/dropped, `reservation.shrink()` releases the memory. - -This single change would make the pool aware of the largest untracked memory -category and allow backpressure to trigger spill in downstream Sort/Aggregate. - -### Priority 2: Track Array Copies (copy.rs) - -`copy_array()` and `copy_or_unpack_array()` should accept a `&MemoryReservation` -and call `try_grow()` before allocating the copy. This requires threading the -reservation through from the calling ScanExec. - -### Priority 3: Track Shuffle Decompression (shuffle_scan.rs) - -`ShuffleScanExec` should hold a `MemoryReservation` and track decompressed batch -sizes, similar to the ScanExec fix. - -### Priority 4: Implement spill() Callback — DONE - -Implemented via `SpillState` + JNI `requestSpill`. The shrink-the-pool approach: - -1. When Spark calls `spill(size)`, JNI into native to set `SpillState.pressure` -2. Pool's `try_grow()` checks pressure and returns `ResourcesExhausted` -3. Sort/Aggregate/Shuffle operators react by spilling internally -4. As operators call `shrink()`, freed bytes are tracked and reported back -5. Actual bytes freed returned to Spark - -See `native/core/src/execution/memory_pools/spill.rs` and -`spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java`. - -### Priority 5: Window Function Spill - -`BoundedWindowAggExec` buffers entire partitions. For large partitions at 1TB, -this needs either: - -- Use DataFusion's spillable window variant if available -- Fall back to Spark's window operator above a threshold - -### Comparison with Gluten's Approach - -Gluten solves this differently because Velox owns the entire execution pipeline -including scans. All memory flows through Velox's allocator, which calls back to -Java's `ReservationListener` on every allocation. The `TreeMemoryConsumer` hierarchy -then coordinates spill across operators. - -Comet's challenge is the FFI boundary — Arrow arrays arrive from JVM as raw pointers -with no allocator integration. The arrays are allocated by the JVM's Arrow library, -and Comet's native side just maps them. Tracking must be explicitly added. - -### Remaining Work - -**Phase 1: Track ScanExec batches** - -- Add MemoryReservation to ScanExec -- Track batch sizes on import, release on consumption - -**Phase 2: Track remaining gaps** - -- copy.rs array copies -- shuffle_scan.rs decompression buffers -- Selection vector processing - -**Phase 3: Extend spill callback to FairUnified pool** - -- Same pattern as GreedyUnified — add SpillState to CometFairMemoryPool