Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions benchmarks/tpc/memory-profile.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Memory profiling script for TPC-H queries.
# Runs each query under different configurations and records peak RSS.
#
# Usage:
# ./memory-profile.sh [--queries "1 5 9"] [--offheap-sizes "4g 8g 16g"]
# [--cores 4] [--data /path/to/tpch]
#
# Requires: SPARK_HOME, COMET_JAR (or builds from source)

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"

# Defaults
QUERIES="${QUERIES:-$(seq 1 22)}"
OFFHEAP_SIZES="${OFFHEAP_SIZES:-4g 8g 16g}"
LOCAL_CORES="${LOCAL_CORES:-4}"
TPCH_DATA="${TPCH_DATA:-/opt/tpch/sf100}"
SPARK_HOME="${SPARK_HOME:-/opt/spark-3.5.8-bin-hadoop3}"
DRIVER_MEMORY="${DRIVER_MEMORY:-8g}"
OUTPUT_DIR="${OUTPUT_DIR:-$SCRIPT_DIR/memory-profile-results}"

# Find Comet JAR
if [ -z "${COMET_JAR:-}" ]; then
COMET_JAR=$(ls "$REPO_ROOT"/spark/target/comet-spark-spark3.5_2.12-*-SNAPSHOT.jar 2>/dev/null \
| grep -v sources | grep -v test | head -1)
if [ -z "$COMET_JAR" ]; then
echo "Error: No Comet JAR found. Set COMET_JAR or run 'make'."
exit 1
fi
fi

# Parse args
while [[ $# -gt 0 ]]; do
case $1 in
--queries) QUERIES="$2"; shift 2 ;;
--offheap-sizes) OFFHEAP_SIZES="$2"; shift 2 ;;
--cores) LOCAL_CORES="$2"; shift 2 ;;
--data) TPCH_DATA="$2"; shift 2 ;;
--output) OUTPUT_DIR="$2"; shift 2 ;;
--driver-memory) DRIVER_MEMORY="$2"; shift 2 ;;
*) echo "Unknown option: $1"; exit 1 ;;
esac
done

mkdir -p "$OUTPUT_DIR"
RESULTS_FILE="$OUTPUT_DIR/results.csv"
LOG_DIR="$OUTPUT_DIR/logs"
mkdir -p "$LOG_DIR"

# Ensure spark-events dir exists
mkdir -p /tmp/spark-events

echo "Configuration:"
echo " SPARK_HOME: $SPARK_HOME"
echo " COMET_JAR: $COMET_JAR"
echo " TPCH_DATA: $TPCH_DATA"
echo " LOCAL_CORES: $LOCAL_CORES"
echo " DRIVER_MEMORY: $DRIVER_MEMORY"
echo " OFFHEAP_SIZES: $OFFHEAP_SIZES"
echo " QUERIES: $QUERIES"
echo " OUTPUT_DIR: $OUTPUT_DIR"
echo ""

# CSV header
echo "engine,offheap_size,query,peak_rss_mb,wall_time_sec,exit_code" > "$RESULTS_FILE"

# Run a single query and capture peak RSS
run_query() {
local engine="$1"
local offheap="$2"
local query_num="$3"
local label="${engine}-offheap${offheap}-q${query_num}"
local log_file="$LOG_DIR/${label}.log"
local time_file="$LOG_DIR/${label}.time"

# Common Spark conf
local conf=(
--master "local[${LOCAL_CORES}]"
--driver-memory "$DRIVER_MEMORY"
--conf "spark.memory.offHeap.enabled=true"
--conf "spark.memory.offHeap.size=${offheap}"
--conf "spark.eventLog.enabled=false"
--conf "spark.ui.enabled=false"
)

if [ "$engine" = "comet" ]; then
conf+=(
--jars "$COMET_JAR"
--driver-class-path "$COMET_JAR"
--conf "spark.driver.extraClassPath=$COMET_JAR"
--conf "spark.plugins=org.apache.spark.CometPlugin"
--conf "spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
--conf "spark.comet.scan.impl=native_datafusion"
--conf "spark.comet.expression.Cast.allowIncompatible=true"
)
fi

conf+=(
"$SCRIPT_DIR/tpcbench.py"
--name "$label"
--benchmark tpch
--data "$TPCH_DATA"
--format parquet
--output "$OUTPUT_DIR"
--iterations 1
--query "$query_num"
)

echo -n " $label ... "

# Run with /usr/bin/time -l, capture stderr (where time writes) separately
local exit_code=0
/usr/bin/time -l "$SPARK_HOME/bin/spark-submit" "${conf[@]}" \
> "$log_file" 2> "$time_file" || exit_code=$?

# Parse peak RSS from /usr/bin/time -l output (macOS format: bytes)
local peak_rss_bytes
peak_rss_bytes=$(grep "maximum resident set size" "$time_file" | awk '{print $1}') || true
local peak_rss_mb="N/A"
if [ -n "$peak_rss_bytes" ]; then
peak_rss_mb=$((peak_rss_bytes / 1048576))
fi

# Parse wall clock time (macOS format: " 89.94 real 363.21 user 10.80 sys")
local wall_secs="N/A"
local wall_time
wall_time=$(grep "real" "$time_file" | tail -1 | awk '{print $1}') || true
if [ -n "$wall_time" ]; then
wall_secs="$wall_time"
fi

if [ "$exit_code" -eq 0 ]; then
echo "RSS=${peak_rss_mb}MB, time=${wall_secs}s"
else
echo "FAILED (exit=$exit_code), RSS=${peak_rss_mb}MB"
fi

echo "${engine},${offheap},${query_num},${peak_rss_mb},${wall_secs},${exit_code}" >> "$RESULTS_FILE"
}

# Main loop
echo "=== Running Spark baseline (no Comet) ==="
for q in $QUERIES; do
run_query "spark" "4g" "$q"
done

echo ""
echo "=== Running Comet with varying offHeap sizes ==="
for offheap in $OFFHEAP_SIZES; do
echo "--- offHeap = $offheap ---"
for q in $QUERIES; do
run_query "comet" "$offheap" "$q"
done
echo ""
done

echo "=== Results ==="
column -t -s, "$RESULTS_FILE"
echo ""
echo "Full results: $RESULTS_FILE"
echo "Logs: $LOG_DIR/"
9 changes: 9 additions & 0 deletions docs/source/contributor-guide/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ memory reservations. This happens on whatever thread the operator is executing o
are thread-safe (they use stored `GlobalRef`s, not thread-locals), but they do trigger
`AttachCurrentThread`.

**Spill coordination between Spark and DataFusion.** When Spark's memory manager needs to
reclaim memory, it calls `NativeMemoryConsumer.spill()` on a Spark thread. This routes through
JNI to `SpillState::request_spill()`, which sets a pressure value and blocks. On the native
side, `CometUnifiedMemoryPool::try_grow()` checks `SpillState::pressure()` and returns
`ResourcesExhausted` when pressure is set, causing DataFusion operators to spill. As operators
call `shrink()`, freed bytes are recorded in `SpillState` and the waiting Spark thread is
notified via a `Condvar`. This coordination crosses the JVM/tokio thread boundary — the
requesting thread is a Spark task thread while the responding threads are tokio workers.

**Scalar subqueries call into the JVM.** `Subquery::evaluate()` calls static methods on
`CometScalarSubquery` via JNI. These use a static `HashMap`, not thread-locals, so they are
safe from any thread.
Expand Down
47 changes: 44 additions & 3 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ use std::{sync::Arc, task::Poll};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

use crate::execution::memory_pools::spill::SpillState;
use crate::execution::memory_pools::{
create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config, MemoryPoolConfig,
create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config,
MemoryPoolConfig, MemoryPoolType,
};
use crate::execution::operators::{ScanExec, ShuffleScanExec};
use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec};
Expand Down Expand Up @@ -177,6 +179,8 @@ struct ExecutionContext {
pub explain_native: bool,
/// Memory pool config
pub memory_pool_config: MemoryPoolConfig,
/// Spill state for coordinating spill requests between Spark and native operators
pub spill_state: Option<Arc<SpillState>>,
/// Whether to log memory usage on each call to execute_plan
pub tracing_enabled: bool,
}
Expand Down Expand Up @@ -257,8 +261,20 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_limit,
memory_limit_per_task,
)?;
let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);
let spill_state = if off_heap_mode != JNI_FALSE
&& matches!(memory_pool_config.pool_type, MemoryPoolType::GreedyUnified)
{
Some(Arc::new(SpillState::new()))
} else {
None
};

let memory_pool = create_memory_pool(
&memory_pool_config,
task_memory_manager,
task_attempt_id,
spill_state.clone(),
);

let memory_pool = if logging_memory_pool {
Arc::new(LoggingMemoryPool::new(task_attempt_id as u64, memory_pool))
Expand Down Expand Up @@ -326,6 +342,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
debug_native,
explain_native,
memory_pool_config,
spill_state,
tracing_enabled,
});

Expand Down Expand Up @@ -706,6 +723,30 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan(
})
}

/// Called from `CometTaskMemoryManager.spill()` via JNI to request that native
/// operators free memory. Returns the number of bytes actually freed.
/// # Safety
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_comet_Native_requestSpill(
e: JNIEnv,
_class: JClass,
exec_context: jlong,
size: jlong,
) -> jlong {
try_unwrap_or_throw(&e, |_env| {
let exec_context = get_execution_context(exec_context);
if let Some(ref spill_state) = exec_context.spill_state {
let timeout = std::time::Duration::from_secs(10);
let freed = spill_state.request_spill(size as usize, timeout);
Ok(freed as jlong)
} else {
// No spill state (not using unified pool) — can't spill
Ok(0i64)
}
})
}

/// Updates the metrics of the query plan.
fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> CometResult<()> {
if let Some(native_query) = &exec_context.root_op {
Expand Down
12 changes: 9 additions & 3 deletions native/core/src/execution/memory_pools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
mod config;
mod fair_pool;
pub mod logging_pool;
pub(crate) mod spill;
mod task_shared;
mod unified_pool;

Expand All @@ -27,6 +28,7 @@ use datafusion::execution::memory_pool::{
use fair_pool::CometFairMemoryPool;
use jni::objects::GlobalRef;
use once_cell::sync::OnceCell;
use spill::SpillState;
use std::num::NonZeroUsize;
use std::sync::Arc;
use unified_pool::CometUnifiedMemoryPool;
Expand All @@ -38,13 +40,17 @@ pub(crate) fn create_memory_pool(
memory_pool_config: &MemoryPoolConfig,
comet_task_memory_manager: Arc<GlobalRef>,
task_attempt_id: i64,
spill_state: Option<Arc<SpillState>>,
) -> Arc<dyn MemoryPool> {
const NUM_TRACKED_CONSUMERS: usize = 10;
match memory_pool_config.pool_type {
MemoryPoolType::GreedyUnified => {
// Set Comet memory pool for native
let memory_pool =
CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);
let spill_state = spill_state.unwrap_or_else(|| Arc::new(SpillState::new()));
let memory_pool = CometUnifiedMemoryPool::new(
comet_task_memory_manager,
task_attempt_id,
spill_state,
);
Arc::new(TrackConsumersPool::new(
memory_pool,
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
Expand Down
Loading
Loading