diff --git a/benchmarks/tpc/memory-profile.sh b/benchmarks/tpc/memory-profile.sh new file mode 100755 index 0000000000..be088f8e0e --- /dev/null +++ b/benchmarks/tpc/memory-profile.sh @@ -0,0 +1,181 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Memory profiling script for TPC-H queries. +# Runs each query under different configurations and records peak RSS. +# +# Usage: +# ./memory-profile.sh [--queries "1 5 9"] [--offheap-sizes "4g 8g 16g"] +# [--cores 4] [--data /path/to/tpch] +# +# Requires: SPARK_HOME, COMET_JAR (or builds from source) + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" + +# Defaults +QUERIES="${QUERIES:-$(seq 1 22)}" +OFFHEAP_SIZES="${OFFHEAP_SIZES:-4g 8g 16g}" +LOCAL_CORES="${LOCAL_CORES:-4}" +TPCH_DATA="${TPCH_DATA:-/opt/tpch/sf100}" +SPARK_HOME="${SPARK_HOME:-/opt/spark-3.5.8-bin-hadoop3}" +DRIVER_MEMORY="${DRIVER_MEMORY:-8g}" +OUTPUT_DIR="${OUTPUT_DIR:-$SCRIPT_DIR/memory-profile-results}" + +# Find Comet JAR +if [ -z "${COMET_JAR:-}" ]; then + COMET_JAR=$(ls "$REPO_ROOT"/spark/target/comet-spark-spark3.5_2.12-*-SNAPSHOT.jar 2>/dev/null \ + | grep -v sources | grep -v test | head -1) + if [ -z "$COMET_JAR" ]; then + echo "Error: No Comet JAR found. Set COMET_JAR or run 'make'." + exit 1 + fi +fi + +# Parse args +while [[ $# -gt 0 ]]; do + case $1 in + --queries) QUERIES="$2"; shift 2 ;; + --offheap-sizes) OFFHEAP_SIZES="$2"; shift 2 ;; + --cores) LOCAL_CORES="$2"; shift 2 ;; + --data) TPCH_DATA="$2"; shift 2 ;; + --output) OUTPUT_DIR="$2"; shift 2 ;; + --driver-memory) DRIVER_MEMORY="$2"; shift 2 ;; + *) echo "Unknown option: $1"; exit 1 ;; + esac +done + +mkdir -p "$OUTPUT_DIR" +RESULTS_FILE="$OUTPUT_DIR/results.csv" +LOG_DIR="$OUTPUT_DIR/logs" +mkdir -p "$LOG_DIR" + +# Ensure spark-events dir exists +mkdir -p /tmp/spark-events + +echo "Configuration:" +echo " SPARK_HOME: $SPARK_HOME" +echo " COMET_JAR: $COMET_JAR" +echo " TPCH_DATA: $TPCH_DATA" +echo " LOCAL_CORES: $LOCAL_CORES" +echo " DRIVER_MEMORY: $DRIVER_MEMORY" +echo " OFFHEAP_SIZES: $OFFHEAP_SIZES" +echo " QUERIES: $QUERIES" +echo " OUTPUT_DIR: $OUTPUT_DIR" +echo "" + +# CSV header +echo "engine,offheap_size,query,peak_rss_mb,wall_time_sec,exit_code" > "$RESULTS_FILE" + +# Run a single query and capture peak RSS +run_query() { + local engine="$1" + local offheap="$2" + local query_num="$3" + local label="${engine}-offheap${offheap}-q${query_num}" + local log_file="$LOG_DIR/${label}.log" + local time_file="$LOG_DIR/${label}.time" + + # Common Spark conf + local conf=( + --master "local[${LOCAL_CORES}]" + --driver-memory "$DRIVER_MEMORY" + --conf "spark.memory.offHeap.enabled=true" + --conf "spark.memory.offHeap.size=${offheap}" + --conf "spark.eventLog.enabled=false" + --conf "spark.ui.enabled=false" + ) + + if [ "$engine" = "comet" ]; then + conf+=( + --jars "$COMET_JAR" + --driver-class-path "$COMET_JAR" + --conf "spark.driver.extraClassPath=$COMET_JAR" + --conf "spark.plugins=org.apache.spark.CometPlugin" + --conf "spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" + --conf "spark.comet.scan.impl=native_datafusion" + --conf "spark.comet.expression.Cast.allowIncompatible=true" + ) + fi + + conf+=( + "$SCRIPT_DIR/tpcbench.py" + --name "$label" + --benchmark tpch + --data "$TPCH_DATA" + --format parquet + --output "$OUTPUT_DIR" + --iterations 1 + --query "$query_num" + ) + + echo -n " $label ... " + + # Run with /usr/bin/time -l, capture stderr (where time writes) separately + local exit_code=0 + /usr/bin/time -l "$SPARK_HOME/bin/spark-submit" "${conf[@]}" \ + > "$log_file" 2> "$time_file" || exit_code=$? + + # Parse peak RSS from /usr/bin/time -l output (macOS format: bytes) + local peak_rss_bytes + peak_rss_bytes=$(grep "maximum resident set size" "$time_file" | awk '{print $1}') || true + local peak_rss_mb="N/A" + if [ -n "$peak_rss_bytes" ]; then + peak_rss_mb=$((peak_rss_bytes / 1048576)) + fi + + # Parse wall clock time (macOS format: " 89.94 real 363.21 user 10.80 sys") + local wall_secs="N/A" + local wall_time + wall_time=$(grep "real" "$time_file" | tail -1 | awk '{print $1}') || true + if [ -n "$wall_time" ]; then + wall_secs="$wall_time" + fi + + if [ "$exit_code" -eq 0 ]; then + echo "RSS=${peak_rss_mb}MB, time=${wall_secs}s" + else + echo "FAILED (exit=$exit_code), RSS=${peak_rss_mb}MB" + fi + + echo "${engine},${offheap},${query_num},${peak_rss_mb},${wall_secs},${exit_code}" >> "$RESULTS_FILE" +} + +# Main loop +echo "=== Running Spark baseline (no Comet) ===" +for q in $QUERIES; do + run_query "spark" "4g" "$q" +done + +echo "" +echo "=== Running Comet with varying offHeap sizes ===" +for offheap in $OFFHEAP_SIZES; do + echo "--- offHeap = $offheap ---" + for q in $QUERIES; do + run_query "comet" "$offheap" "$q" + done + echo "" +done + +echo "=== Results ===" +column -t -s, "$RESULTS_FILE" +echo "" +echo "Full results: $RESULTS_FILE" +echo "Logs: $LOG_DIR/" diff --git a/docs/source/contributor-guide/development.md b/docs/source/contributor-guide/development.md index 47f034c973..7873d04827 100644 --- a/docs/source/contributor-guide/development.md +++ b/docs/source/contributor-guide/development.md @@ -74,6 +74,15 @@ memory reservations. This happens on whatever thread the operator is executing o are thread-safe (they use stored `GlobalRef`s, not thread-locals), but they do trigger `AttachCurrentThread`. +**Spill coordination between Spark and DataFusion.** When Spark's memory manager needs to +reclaim memory, it calls `NativeMemoryConsumer.spill()` on a Spark thread. This routes through +JNI to `SpillState::request_spill()`, which sets a pressure value and blocks. On the native +side, `CometUnifiedMemoryPool::try_grow()` checks `SpillState::pressure()` and returns +`ResourcesExhausted` when pressure is set, causing DataFusion operators to spill. As operators +call `shrink()`, freed bytes are recorded in `SpillState` and the waiting Spark thread is +notified via a `Condvar`. This coordination crosses the JVM/tokio thread boundary — the +requesting thread is a Spark task thread while the responding threads are tokio workers. + **Scalar subqueries call into the JVM.** `Subquery::evaluate()` calls static methods on `CometScalarSubquery` via JNI. These use a static `HashMap`, not thread-locals, so they are safe from any thread. diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index e0a395ebbf..7c817b04de 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -79,8 +79,10 @@ use std::{sync::Arc, task::Poll}; use tokio::runtime::Runtime; use tokio::sync::mpsc; +use crate::execution::memory_pools::spill::SpillState; use crate::execution::memory_pools::{ - create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config, MemoryPoolConfig, + create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config, + MemoryPoolConfig, MemoryPoolType, }; use crate::execution::operators::{ScanExec, ShuffleScanExec}; use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec}; @@ -177,6 +179,8 @@ struct ExecutionContext { pub explain_native: bool, /// Memory pool config pub memory_pool_config: MemoryPoolConfig, + /// Spill state for coordinating spill requests between Spark and native operators + pub spill_state: Option>, /// Whether to log memory usage on each call to execute_plan pub tracing_enabled: bool, } @@ -257,8 +261,20 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_limit, memory_limit_per_task, )?; - let memory_pool = - create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id); + let spill_state = if off_heap_mode != JNI_FALSE + && matches!(memory_pool_config.pool_type, MemoryPoolType::GreedyUnified) + { + Some(Arc::new(SpillState::new())) + } else { + None + }; + + let memory_pool = create_memory_pool( + &memory_pool_config, + task_memory_manager, + task_attempt_id, + spill_state.clone(), + ); let memory_pool = if logging_memory_pool { Arc::new(LoggingMemoryPool::new(task_attempt_id as u64, memory_pool)) @@ -326,6 +342,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( debug_native, explain_native, memory_pool_config, + spill_state, tracing_enabled, }); @@ -706,6 +723,30 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan( }) } +/// Called from `CometTaskMemoryManager.spill()` via JNI to request that native +/// operators free memory. Returns the number of bytes actually freed. +/// # Safety +/// This function is inherently unsafe since it deals with raw pointers passed from JNI. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_comet_Native_requestSpill( + e: JNIEnv, + _class: JClass, + exec_context: jlong, + size: jlong, +) -> jlong { + try_unwrap_or_throw(&e, |_env| { + let exec_context = get_execution_context(exec_context); + if let Some(ref spill_state) = exec_context.spill_state { + let timeout = std::time::Duration::from_secs(10); + let freed = spill_state.request_spill(size as usize, timeout); + Ok(freed as jlong) + } else { + // No spill state (not using unified pool) — can't spill + Ok(0i64) + } + }) +} + /// Updates the metrics of the query plan. fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> CometResult<()> { if let Some(native_query) = &exec_context.root_op { diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index d8b3473353..748304a9ab 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -18,6 +18,7 @@ mod config; mod fair_pool; pub mod logging_pool; +pub(crate) mod spill; mod task_shared; mod unified_pool; @@ -27,6 +28,7 @@ use datafusion::execution::memory_pool::{ use fair_pool::CometFairMemoryPool; use jni::objects::GlobalRef; use once_cell::sync::OnceCell; +use spill::SpillState; use std::num::NonZeroUsize; use std::sync::Arc; use unified_pool::CometUnifiedMemoryPool; @@ -38,13 +40,17 @@ pub(crate) fn create_memory_pool( memory_pool_config: &MemoryPoolConfig, comet_task_memory_manager: Arc, task_attempt_id: i64, + spill_state: Option>, ) -> Arc { const NUM_TRACKED_CONSUMERS: usize = 10; match memory_pool_config.pool_type { MemoryPoolType::GreedyUnified => { - // Set Comet memory pool for native - let memory_pool = - CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id); + let spill_state = spill_state.unwrap_or_else(|| Arc::new(SpillState::new())); + let memory_pool = CometUnifiedMemoryPool::new( + comet_task_memory_manager, + task_attempt_id, + spill_state, + ); Arc::new(TrackConsumersPool::new( memory_pool, NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), diff --git a/native/core/src/execution/memory_pools/spill.rs b/native/core/src/execution/memory_pools/spill.rs new file mode 100644 index 0000000000..df53ce58fc --- /dev/null +++ b/native/core/src/execution/memory_pools/spill.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Condvar, Mutex}; +use std::time::Duration; + +/// Shared state for coordinating spill requests between Spark's memory manager +/// (which calls `NativeMemoryConsumer.spill()` on a Spark thread) and DataFusion +/// operators (which call `try_grow()`/`shrink()` on tokio threads). +/// +/// When Spark needs to reclaim memory from Comet, it sets `pressure` via +/// `request_spill()`. The memory pool's `try_grow()` checks this and returns +/// `ResourcesExhausted`, causing operators to spill. As operators call `shrink()`, +/// freed bytes are accumulated and the waiting Spark thread is notified. +#[derive(Debug)] +pub struct SpillState { + /// Bytes requested to be freed. Set by Spark's spill() callback. + pressure: AtomicUsize, + /// Bytes actually freed since pressure was set. + freed: AtomicUsize, + /// Mutex + Condvar to allow the spill requester to wait for operators to react. + notify: (Mutex<()>, Condvar), +} + +impl SpillState { + pub fn new() -> Self { + Self { + pressure: AtomicUsize::new(0), + freed: AtomicUsize::new(0), + notify: (Mutex::new(()), Condvar::new()), + } + } + + /// Returns the current spill pressure in bytes. Called by the memory pool's + /// `try_grow()` to decide whether to deny allocations. + pub fn pressure(&self) -> usize { + self.pressure.load(Ordering::Acquire) + } + + /// Record that `size` bytes were freed (called from pool's `shrink()`). + /// Wakes the waiting spill requester. + pub fn record_freed(&self, size: usize) { + self.freed.fetch_add(size, Ordering::Release); + let (_lock, cvar) = &self.notify; + cvar.notify_all(); + } + + /// Called from JNI when Spark's `NativeMemoryConsumer.spill()` is invoked. + /// Sets spill pressure and waits (up to `timeout`) for operators to free memory. + /// Returns the actual number of bytes freed. + pub fn request_spill(&self, size: usize, timeout: Duration) -> usize { + // Reset freed counter and set pressure + self.freed.store(0, Ordering::Release); + self.pressure.store(size, Ordering::Release); + + // Wait for operators to react + let (lock, cvar) = &self.notify; + let mut guard = lock.lock().unwrap(); + let deadline = std::time::Instant::now() + timeout; + while self.freed.load(Ordering::Acquire) < size { + let remaining = deadline.saturating_duration_since(std::time::Instant::now()); + if remaining.is_zero() { + break; + } + let (new_guard, _timeout_result) = cvar.wait_timeout(guard, remaining).unwrap(); + guard = new_guard; + } + + // Clear pressure and return freed bytes + self.pressure.store(0, Ordering::Release); + self.freed.load(Ordering::Acquire) + } +} + +impl Default for SpillState { + fn default() -> Self { + Self::new() + } +} diff --git a/native/core/src/execution/memory_pools/unified_pool.rs b/native/core/src/execution/memory_pools/unified_pool.rs index 3233dd6d40..a5e7a425de 100644 --- a/native/core/src/execution/memory_pools/unified_pool.rs +++ b/native/core/src/execution/memory_pools/unified_pool.rs @@ -23,6 +23,7 @@ use std::{ }, }; +use super::spill::SpillState; use crate::{errors::CometResult, jvm_bridge::JVMClasses}; use datafusion::{ common::{resources_datafusion_err, DataFusionError}, @@ -38,12 +39,14 @@ pub struct CometUnifiedMemoryPool { task_memory_manager_handle: Arc, used: AtomicUsize, task_attempt_id: i64, + spill_state: Arc, } impl Debug for CometUnifiedMemoryPool { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { f.debug_struct("CometUnifiedMemoryPool") .field("used", &self.used.load(Relaxed)) + .field("spill_pressure", &self.spill_state.pressure()) .finish() } } @@ -52,11 +55,13 @@ impl CometUnifiedMemoryPool { pub fn new( task_memory_manager_handle: Arc, task_attempt_id: i64, + spill_state: Arc, ) -> CometUnifiedMemoryPool { Self { task_memory_manager_handle, task_attempt_id, used: AtomicUsize::new(0), + spill_state, } } @@ -116,10 +121,21 @@ impl MemoryPool for CometUnifiedMemoryPool { self.task_attempt_id ); } + if self.spill_state.pressure() > 0 { + self.spill_state.record_freed(size); + } } fn try_grow(&self, _: &MemoryReservation, additional: usize) -> Result<(), DataFusionError> { if additional > 0 { + if self.spill_state.pressure() > 0 { + return Err(resources_datafusion_err!( + "Task {} denied {} bytes due to spill pressure. Reserved: {}", + self.task_attempt_id, + additional, + self.reserved() + )); + } let acquired = self.acquire_from_spark(additional)?; // If the number of bytes we acquired is less than the requested, return an error, // and hopefully will trigger spilling from the caller side. diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java index 3f344da68f..454fe7d0cd 100644 --- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -29,6 +29,8 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.memory.TaskMemoryManager; +import org.apache.comet.Native; + /** * A adapter class that is used by Comet native to acquire & release memory through Spark's unified * memory manager. This assumes Spark's off-heap memory mode is enabled. @@ -46,6 +48,12 @@ public class CometTaskMemoryManager { private final NativeMemoryConsumer nativeMemoryConsumer; private final AtomicLong used = new AtomicLong(); + /** + * The native ExecutionContext handle. Set after native plan creation. Used to route spill() + * requests to the native memory pool. + */ + private volatile long nativePlanHandle = 0; + public CometTaskMemoryManager(long id, long taskAttemptId) { this.id = id; this.taskAttemptId = taskAttemptId; @@ -53,6 +61,14 @@ public CometTaskMemoryManager(long id, long taskAttemptId) { this.nativeMemoryConsumer = new NativeMemoryConsumer(); } + /** + * Set the native plan handle after plan creation. This enables the spill() callback to route + * requests to the native memory pool. + */ + public void setNativePlanHandle(long handle) { + this.nativePlanHandle = handle; + } + // Called by Comet native through JNI. // Returns the actual amount of memory (in bytes) granted. public long acquireMemory(long size) { @@ -97,9 +113,10 @@ public long getUsed() { } /** - * A dummy memory consumer that does nothing when spilling. At the moment, Comet native doesn't - * share the same API as Spark and cannot trigger spill when acquire memory. Therefore, when - * acquiring memory from native or JVM, spilling can only be triggered from JVM operators. + * A memory consumer that routes spill requests to the native memory pool. When Spark's memory + * manager needs to reclaim memory (e.g., for another task), it calls spill() which signals the + * native pool to apply backpressure. DataFusion operators (Sort, Aggregate, Shuffle) react by + * spilling their internal state to disk. */ private class NativeMemoryConsumer extends MemoryConsumer { protected NativeMemoryConsumer() { @@ -108,13 +125,26 @@ protected NativeMemoryConsumer() { @Override public long spill(long size, MemoryConsumer trigger) throws IOException { - // No spilling - return 0; + long handle = nativePlanHandle; + if (handle == 0) { + // Native plan not yet created or already destroyed + return 0; + } + logger.info( + "Task {} received spill request for {} bytes, forwarding to native", taskAttemptId, size); + try { + long freed = new Native().requestSpill(handle, size); + logger.info("Task {} native spill freed {} bytes", taskAttemptId, freed); + return freed; + } catch (Exception e) { + logger.warn("Task {} native spill failed: {}", taskAttemptId, e.getMessage()); + return 0; + } } @Override public String toString() { - return String.format("NativeMemoryConsumer(id=%)", id); + return String.format("NativeMemoryConsumer(id=%d, taskAttemptId=%d)", id, taskAttemptId); } } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index e198ac99ff..734ef240e7 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -110,7 +110,7 @@ class CometExecIterator( val memoryConfig = CometExecIterator.getMemoryConfig(conf) - nativeLib.createPlan( + val handle = nativeLib.createPlan( id, inputIterators, protobufQueryPlan, @@ -128,6 +128,11 @@ class CometExecIterator( taskAttemptId, taskCPUs, keyUnwrapper) + + // Enable spill callback by giving the memory manager a handle to the native plan + cometTaskMemoryManager.setNativePlanHandle(handle) + + handle } private var nextBatch: Option[ColumnarBatch] = None @@ -236,6 +241,7 @@ class CometExecIterator( } nativeUtil.close() shuffleBlockIterators.values.foreach(_.close()) + cometTaskMemoryManager.setNativePlanHandle(0) nativeLib.releasePlan(plan) if (tracingEnabled) { diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index f6800626d6..2054e33cb0 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -103,6 +103,18 @@ class Native extends NativeBase { */ @native def releasePlan(plan: Long): Unit + /** + * Request that native operators spill memory. Called from CometTaskMemoryManager.spill(). + * + * @param nativePlanHandle + * the native ExecutionContext pointer + * @param size + * bytes requested to free + * @return + * actual bytes freed + */ + @native def requestSpill(nativePlanHandle: Long, size: Long): Long + /** * Used by Comet shuffle external sorter to write sorted records to disk. *