Skip to content

feat: reduce shuffle format overhead with one IPC stream per partition [experimental]#3883

Draft
andygrove wants to merge 51 commits intoapache:mainfrom
andygrove:shuffle-format-overhead
Draft

feat: reduce shuffle format overhead with one IPC stream per partition [experimental]#3883
andygrove wants to merge 51 commits intoapache:mainfrom
andygrove:shuffle-format-overhead

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Apr 2, 2026

Which issue does this PR close?

Closes #3882

This follows on from #3845 so we need to merge that one first.

Rationale for this change

The current shuffle format writes each batch as an independent Arrow IPC stream, repeating the schema for every batch. This can create larger shuffle files than Spark for some workloads.

What changes are included in this PR?

Write side (Rust)

  • One StreamWriter per partition instead of one per batch — schema is written once, N record batch messages are appended
  • Arrow IPC built-in body compression via IpcWriteOptions (LZ4_FRAME/ZSTD) replaces external compression wrappers (lz4_flex, zstd, snap)
  • All three partitioners updated: immediate mode, buffered mode, single partition
  • ShuffleBlockWriter replaced with slim CompressionCodec enum that wraps IpcWriteOptions
  • Sort-based shuffle path (row.rs) also uses persistent StreamWriter

Read side (Rust + JVM)

  • JniInputStream — Rust struct implementing std::io::Read that pulls bytes from JVM InputStream via JNI callbacks with 64KB read-ahead buffer
  • ShuffleStreamReader — wraps Arrow StreamReader<JniInputStream>, handles concatenated IPC streams (from spills) and empty streams
  • NativeBatchDecoderIterator (JVM) — simplified to native handle pattern (open/next/close), removed all manual header parsing
  • ShuffleScanExec (Rust) — uses ShuffleStreamReader directly instead of CometShuffleBlockIterator

Removed

  • Custom 20-byte header format (8-byte length + 8-byte field count + 4-byte codec tag)
  • ShuffleBlockWriter struct and per-batch IPC stream creation
  • CometShuffleBlockIterator.java and its JNI bridge
  • read_ipc_compressed legacy read function
  • Snappy shuffle codec support (Arrow IPC only supports LZ4_FRAME and ZSTD)
  • External compression dependencies (snap, lz4_flex, zstd) from shuffle crate

How are these changes tested?

Existing tests

Add a `shuffle_bench` binary that benchmarks shuffle write and read
performance independently from Spark, making it easy to profile with
tools like `cargo flamegraph`, `perf`, or `instruments`.

Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating
synthetic data with configurable schema. Covers different scenarios
including compression codecs, partition counts, partitioning schemes,
and memory-constrained spilling.
…arquet

- Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit
  the number of batches buffered before spilling, allowing earlier
  spilling to reduce peak memory usage on executors
- Fix too-many-open-files: close spill file FD after each spill and
  reopen in append mode, rather than holding one FD open per partition
- Refactor shuffle_bench to stream directly from Parquet instead of
  loading all input data into memory; remove synthetic data generation
- Add --max-buffered-batches CLI arg to shuffle_bench
- Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out
COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing.
This branch now only adds the shuffle benchmark binary.
Spawns N parallel shuffle tasks to simulate executor parallelism.
Each task reads the same input and writes to its own output files.
Extracts core shuffle logic into shared async helper to avoid
code duplication between single and concurrent paths.
Add CometConf config to choose between 'immediate' and 'buffered'
shuffle partitioner modes. The config flows through protobuf to the
native shuffle writer factory, which selects ImmediateModePartitioner
or MultiPartitionShuffleRepartitioner accordingly. Defaults to
'immediate' mode.
… mode

The slicing loop in insert_batch was redundant because BatchCoalescer
already handles chunking. Pass the batch directly. Also remove the
BufReader wrapper around spill files since std::io::copy handles
buffering internally.
- Remove 1MB pre-allocation per partition buffer (Vec::new instead of
  Vec::with_capacity), eliminating num_partitions × 1MB of untracked
  heap allocation
- Track buffer Vec capacity growth instead of content bytes written,
  giving the memory pool accurate visibility into actual allocations
- Drop spill files immediately after combining into final output
  rather than waiting for partitioner drop
- Delete benchmark output files after each iteration to avoid filling
  disk during multi-iteration runs
Replace interleave_record_batch + BatchCoalescer pipeline with a
Gluten-inspired scatter-write design:

- Pre-allocate Arrow array builders per partition
- Scatter-write rows directly from input batches into partition builders
- Flush full partitions as compressed IPC blocks, reusing builder capacity
- Track builder + IPC memory growth for spill decisions

This eliminates per-batch intermediate array allocations (previously
200 interleave calls per input batch) and coalescer builder overhead.
Restructure repartition_batch to iterate column-first instead of
partition-first. Each column's type dispatch now happens once per batch
(16 times) instead of once per partition per column (3,200 times).
For types without direct scatter-append support (List, Map, Struct,
Dictionary, etc.), fall back to arrow::compute::take to produce
sub-arrays which are accumulated and concatenated at flush time.
Primitive and string types continue using the fast scatter path.
Update the tuning guide and contributor guide to describe the two
native shuffle partitioner modes (immediate and buffered), including
architecture diagrams, data flow, memory characteristics, and
configuration.
Revert metrics instrumentation changes to the buffered shuffle
partitioner so this branch only contains the new immediate mode
implementation and benchmark. Fix docs that incorrectly described
immediate mode as writing directly to disk.
Addresses apache#3882 — shuffle format overhead with default batch size.
Replace the custom shuffle block format (per-batch IPC streams with
custom length-prefix headers and external compression wrappers) with
standard Arrow IPC streams using built-in body compression.

Key changes:
- Replace ShuffleBlockWriter with CompressionCodec::ipc_write_options()
  that creates IpcWriteOptions with LZ4_FRAME or ZSTD body compression
- Rewrite BufBatchWriter to use a persistent StreamWriter that writes
  the schema once, then appends N record batch messages
- Rewrite PartitionWriter (spill) to create StreamWriter over spill files
- Rewrite PartitionOutputStream (immediate mode) to use persistent
  StreamWriter<Vec<u8>> with lazy creation and drain/finish lifecycle
- Simplify SinglePartitionShufflePartitioner by removing manual batch
  coalescing (handled by BufBatchWriter's BatchCoalescer)
- Update sort-based shuffle in spark_unsafe/row.rs to use StreamWriter
- Remove snappy from shuffle compression options (keep Snappy variant
  in CompressionCodec enum for Parquet writer compatibility)
- Update all tests to use Arrow StreamReader for roundtrip verification
- Update shuffle_bench binary and criterion benchmarks

The old ipc.rs read path is preserved for Task 6. The core crate will
have expected compile errors in shuffle_scan.rs tests and jni_api.rs
due to removed ShuffleBlockWriter export.
Add JniInputStream (implements std::io::Read by pulling bytes from a JVM
InputStream via JNI with 64KB read-ahead buffer) and ShuffleStreamReader
(wraps Arrow StreamReader<JniInputStream> for lifecycle management).

Replace decodeShuffleBlock JNI function with four new streaming functions:
openShuffleStream, nextShuffleStreamBatch, shuffleStreamNumFields, and
closeShuffleStream. The old read_ipc_compressed is retained for the
legacy ShuffleScanExec code path.
Replace decodeShuffleBlock JNI declaration with four new streaming
methods: openShuffleStream, nextShuffleStreamBatch, shuffleStreamNumFields,
and closeShuffleStream. Rewrite NativeBatchDecoderIterator to use a
native handle pattern instead of manual header parsing and ByteBuffer
management.
… streams

Replace the old CometShuffleBlockIterator-based read path in ShuffleScanExec
with ShuffleStreamReader, which reads standard Arrow IPC streams directly from
JVM InputStreams via JniInputStream. This eliminates the custom per-batch block
format (8-byte length + 8-byte field count + 4-byte codec + compressed IPC)
and the per-batch JNI calls (hasNext/getBuffer) in favor of streaming reads.

Changes:
- CometShuffledRowRDD: return raw InputStream instead of CometShuffleBlockIterator
- CometExecIterator: accept Map[Int, InputStream] instead of Map[Int, CometShuffleBlockIterator]
- ShuffleScanExec (Rust): lazily create ShuffleStreamReader from InputStream GlobalRef,
  read batches via reader.next_batch() instead of JNI block-by-block dance
- Add Send+Sync impls for SharedJniStream/StreamReadAdapter to satisfy ExecutionPlan bounds
…ings

- Hold a single StreamWriter across all batches in process_sorted_row_partition
  instead of creating a fresh writer per batch
- Remove read_ipc_compressed and snap/lz4_flex/zstd dependencies from shuffle crate
- Remove dead CometShuffleBlockIterator.java and its JNI bridge
- Rename shuffle_block_writer.rs to codec.rs to reflect its contents
- Remove unused _write_time parameter from BufBatchWriter write/flush
- Make CompressionCodec::Snappy return an error in ipc_write_options
- Remove Snappy from shuffle writer codec mappings in planner and JNI
@andygrove andygrove changed the title feat: reduce shuffle format overhead with one IPC stream per partition feat: reduce shuffle format overhead with one IPC stream per partition [experimental] Apr 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Current shuffle format has too much overhead with default batch size

1 participant