feat: Add immediate mode option for native shuffle#3845
feat: Add immediate mode option for native shuffle#3845andygrove wants to merge 38 commits intoapache:mainfrom
Conversation
Add a `shuffle_bench` binary that benchmarks shuffle write and read performance independently from Spark, making it easy to profile with tools like `cargo flamegraph`, `perf`, or `instruments`. Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating synthetic data with configurable schema. Covers different scenarios including compression codecs, partition counts, partitioning schemes, and memory-constrained spilling.
…arquet - Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit the number of batches buffered before spilling, allowing earlier spilling to reduce peak memory usage on executors - Fix too-many-open-files: close spill file FD after each spill and reopen in append mode, rather than holding one FD open per partition - Refactor shuffle_bench to stream directly from Parquet instead of loading all input data into memory; remove synthetic data generation - Add --max-buffered-batches CLI arg to shuffle_bench - Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing. This branch now only adds the shuffle benchmark binary.
Spawns N parallel shuffle tasks to simulate executor parallelism. Each task reads the same input and writes to its own output files. Extracts core shuffle logic into shared async helper to avoid code duplication between single and concurrent paths.
…ting, and spilling
… read_ipc_compressed
Add CometConf config to choose between 'immediate' and 'buffered' shuffle partitioner modes. The config flows through protobuf to the native shuffle writer factory, which selects ImmediateModePartitioner or MultiPartitionShuffleRepartitioner accordingly. Defaults to 'immediate' mode.
… mode The slicing loop in insert_batch was redundant because BatchCoalescer already handles chunking. Pass the batch directly. Also remove the BufReader wrapper around spill files since std::io::copy handles buffering internally.
- Remove 1MB pre-allocation per partition buffer (Vec::new instead of Vec::with_capacity), eliminating num_partitions × 1MB of untracked heap allocation - Track buffer Vec capacity growth instead of content bytes written, giving the memory pool accurate visibility into actual allocations - Drop spill files immediately after combining into final output rather than waiting for partitioner drop - Delete benchmark output files after each iteration to avoid filling disk during multi-iteration runs
Replace interleave_record_batch + BatchCoalescer pipeline with a Gluten-inspired scatter-write design: - Pre-allocate Arrow array builders per partition - Scatter-write rows directly from input batches into partition builders - Flush full partitions as compressed IPC blocks, reusing builder capacity - Track builder + IPC memory growth for spill decisions This eliminates per-batch intermediate array allocations (previously 200 interleave calls per input batch) and coalescer builder overhead.
Restructure repartition_batch to iterate column-first instead of partition-first. Each column's type dispatch now happens once per batch (16 times) instead of once per partition per column (3,200 times).
For types without direct scatter-append support (List, Map, Struct, Dictionary, etc.), fall back to arrow::compute::take to produce sub-arrays which are accumulated and concatenated at flush time. Primitive and string types continue using the fast scatter path.
Update the tuning guide and contributor guide to describe the two native shuffle partitioner modes (immediate and buffered), including architecture diagrams, data flow, memory characteristics, and configuration.
|
@Kontinuation fyi |
|
@milenkovicm you may be interested in this since it could also be applied to Ballista |
|
thanks @andygrove will have a look, |
milenkovicm
left a comment
There was a problem hiding this comment.
- written blocks are not ordered by partition, am i correct (perhaps documentation about format of data file and index file could be added)
- at the moment each written block will have schema definition included, would it be possible to have a "specialised" stream writer which does not write schema (as it is same for all blocks) ?
- can spill write to result file instead of temporary file ?
The final file contains data ordered by partition. I will improve the docs.
It should be possible to move to the Arrow IPC Stream approach where schema is written once per partition. I have experimented with this in the past but it is quite a big change.
Yes, if there is enough memory. We only spill to the temp files if the memory pool rejects a request to try_grow. |
Revert metrics instrumentation changes to the buffered shuffle partitioner so this branch only contains the new immediate mode implementation and benchmark. Fix docs that incorrectly described immediate mode as writing directly to disk.
Kontinuation
left a comment
There was a problem hiding this comment.
If I understand it correctly, this is quite similar to what we did before switching to interleave_batch-based repartitioning.
One concern is the memory bloat problem problem when the number of partitions is large. It is quite common to repartition the data into 1000s of partitions when working with large datasets in Spark. The memory reserved by the builder would consume lots of memory even before we start to append data into them. That was the motivation for me to implement the interleave_batch-based approach. See also #887 for details.
The immediate mode works better for small number of partitions than interleave_batch-based approach, so I think it is a great to have feature. I would suggest that we invest in implementing sort-based repartitioning to scale better for large number of partitions and better performance.
| │ Writes IPC blocks as │ Buffers all rows in memory │ | ||
| │ batches arrive │ before writing │ | ||
| ├───────────────────────────┴───────────────────────────────────────────┤ |
There was a problem hiding this comment.
| │ Writes IPC blocks as │ Buffers all rows in memory │ | |
| │ batches arrive │ before writing │ | |
| ├───────────────────────────┴───────────────────────────────────────────┤ | |
| │ Writes IPC blocks as │ Buffers all rows in memory │ | |
| │ batches arrive │ before writing │ | |
| ├───────────────────────────┴───────────────────────────────────────────┤ |
| match &self.partitioning { | ||
| CometPartitioning::Hash(exprs, num_output_partitions) => { | ||
| let num_output_partitions = *num_output_partitions; | ||
| let arrays = exprs | ||
| .iter() | ||
| .map(|expr| expr.evaluate(batch)?.into_array(num_rows)) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| let hashes_buf = &mut self.hashes_buf[..num_rows]; | ||
| hashes_buf.fill(42_u32); | ||
| create_murmur3_hashes(&arrays, hashes_buf)?; | ||
| let partition_ids = &mut self.partition_ids[..num_rows]; | ||
| for (idx, hash) in hashes_buf.iter().enumerate() { | ||
| partition_ids[idx] = | ||
| comet_partitioning::pmod(*hash, num_output_partitions) as u32; | ||
| } | ||
| Ok(num_output_partitions) | ||
| } | ||
| CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { | ||
| let num_output_partitions = *num_output_partitions; | ||
| let max_hash_columns = *max_hash_columns; | ||
| let num_columns_to_hash = if max_hash_columns == 0 { | ||
| batch.num_columns() | ||
| } else { | ||
| max_hash_columns.min(batch.num_columns()) | ||
| }; | ||
| let columns_to_hash: Vec<ArrayRef> = (0..num_columns_to_hash) | ||
| .map(|i| Arc::clone(batch.column(i))) | ||
| .collect(); | ||
| let hashes_buf = &mut self.hashes_buf[..num_rows]; | ||
| hashes_buf.fill(42_u32); | ||
| create_murmur3_hashes(&columns_to_hash, hashes_buf)?; | ||
| let partition_ids = &mut self.partition_ids[..num_rows]; | ||
| for (idx, hash) in hashes_buf.iter().enumerate() { | ||
| partition_ids[idx] = | ||
| comet_partitioning::pmod(*hash, num_output_partitions) as u32; | ||
| } | ||
| Ok(num_output_partitions) | ||
| } | ||
| CometPartitioning::RangePartitioning( | ||
| lex_ordering, | ||
| num_output_partitions, | ||
| row_converter, | ||
| bounds, | ||
| ) => { | ||
| let num_output_partitions = *num_output_partitions; | ||
| let arrays = lex_ordering | ||
| .iter() | ||
| .map(|expr| expr.expr.evaluate(batch)?.into_array(num_rows)) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| let row_batch = row_converter.convert_columns(arrays.as_slice())?; | ||
| let partition_ids = &mut self.partition_ids[..num_rows]; | ||
| for (row_idx, row) in row_batch.iter().enumerate() { | ||
| partition_ids[row_idx] = bounds | ||
| .as_slice() | ||
| .partition_point(|bound| bound.row() <= row) | ||
| as u32; | ||
| } | ||
| Ok(num_output_partitions) | ||
| } | ||
| other => Err(DataFusionError::NotImplemented(format!( | ||
| "Unsupported shuffle partitioning scheme {other:?}" | ||
| ))), | ||
| } |
There was a problem hiding this comment.
I suggest that we move partition ID computation to a separate utility to avoid repeating the same logic in multi partition mode and immediate mode.
| "Failed to open spill file for reading: {e}" | ||
| )) | ||
| })?; | ||
| let mut write_timer = self.metrics.write_time.timer(); |
There was a problem hiding this comment.
The write_timer seems to only count the time spent merging spill files. Is that enough?
| 3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner | ||
| based on the `partitionerMode` configuration: | ||
| - **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning. | ||
| As each batch arrives, rows are scattered into per-partition Arrow array builders. When a |
There was a problem hiding this comment.
When a partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC block to an in-memory buffer.
The current IPC writer uses block compression (compressing each batch), which may lead to poor compression ratios. In gluten, the shuffle writer first serializes and buffers the batches, then performs streming compression during eviction, achieving better compression ratios. I'm not entirely sure which is better.
Which issue does this PR close?
Part of #3855
Rationale for this change
The current
MultiPartitionShuffleRepartitionertries to buffer all input batches in memory before writing partitioned output duringshuffle_write. This is relatively efficient, becauseinterleave_record_batchescan create full size output batches, avoiding the need to coalesce later. However, this uses a lot of memory.This PR introduces a new
ImmediateModePartitioner, which takes a different approach: it partitions incoming batches immediately and uses re-usable builders per output partition. These builders get flushed when they reach the target batch size. The batches then get encoded and compressed and these compressed batches are buffered instead of the uncompressed incoming batches, reducing memory overhead and reducing spilling.Note that the default shuffle is still
buffered. I plan on creating a separate PR to change the default once this has had more testing. All CI tests did pass with the default asimmediate.What changes are included in this PR?
buffered)Benchmark Results
I used variations of the following command to run the benchmarks on macOS (M3 Ultra).
Memory usage
Throughput
TPC
I ran benchmarks with TPC-H @ 1TB in AWS and saw no regressions
How are these changes tested?