Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
ae0741b
feat: add standalone shuffle benchmark binary for profiling
andygrove Mar 21, 2026
9b5b305
feat: add --limit option to shuffle benchmark (default 1M rows)
andygrove Mar 21, 2026
e1ab490
perf: apply limit during parquet read to avoid scanning all files
andygrove Mar 21, 2026
b7682f4
feat: move shuffle_bench binary into shuffle crate
andygrove Mar 23, 2026
ca36cbd
chore: add comment explaining parquet/rand deps in shuffle crate
andygrove Mar 23, 2026
7225afd
Merge remote-tracking branch 'apache/main' into shuffle-bench-binary
andygrove Mar 26, 2026
6e8bed2
perf: add max_buffered_batches config and stream shuffle bench from p…
andygrove Mar 26, 2026
16ce30f
merge apache/main, remove max_buffered_batches changes
andygrove Mar 27, 2026
2ef57e7
cargo fmt
andygrove Mar 27, 2026
9136e10
prettier
andygrove Mar 27, 2026
7e16819
machete
andygrove Mar 27, 2026
22fe804
feat: add --concurrent-tasks flag to shuffle benchmark
andygrove Mar 28, 2026
58ab927
show metrics
andygrove Mar 30, 2026
c469077
improve metrics
andygrove Mar 30, 2026
4ed08c5
feat: add PartitionOutputStream with IPC serialization and compression
andygrove Mar 30, 2026
b0d4ab2
feat: implement ImmediateModePartitioner with partition eval, row rou…
andygrove Mar 30, 2026
56574d5
feat: export ImmediateModePartitioner and document factory wiring
andygrove Mar 30, 2026
cdbbdee
test: verify ImmediateModePartitioner block format compatibility with…
andygrove Mar 30, 2026
4bcb014
feat: add spark.comet.exec.shuffle.partitionerMode config
andygrove Mar 30, 2026
f2e459e
feat: add batch coalescing to PartitionOutputStream
andygrove Mar 30, 2026
9ccdda7
feat: add --mode flag to shuffle benchmark for immediate vs buffered
andygrove Mar 30, 2026
cf56e39
feat: add upfront memory reservation for coalescer buffers
andygrove Mar 30, 2026
f9139d7
format
andygrove Mar 30, 2026
76815a9
refactor: remove unnecessary batch slicing and BufReader in immediate…
andygrove Mar 30, 2026
42289ee
refactor: remove BufWriter from shuffle output and unused batch_size …
andygrove Mar 31, 2026
27484ce
fix: improve memory tracking and cleanup in immediate mode partitioner
andygrove Mar 31, 2026
033cda9
merge: integrate latest changes from apache/main
andygrove Mar 31, 2026
aa49a72
feat: scatter-write partitioner for immediate mode shuffle
andygrove Mar 31, 2026
4509985
refactor: column-first loop in scatter-write partitioner
andygrove Mar 31, 2026
e45d733
fix: update test for column-first scatter-write API
andygrove Mar 31, 2026
32194bf
fix: support complex types (List, Map, Struct) in scatter-write
andygrove Mar 31, 2026
7ea12ac
fix: skip spill test under miri (copy_file_range unsupported)
andygrove Mar 31, 2026
d06bfdb
fix: ignore miri for test_partition_output_stream_write_and_read
andygrove Apr 1, 2026
e66a6f3
Merge remote-tracking branch 'apache/main' into immediate-mode-partit…
andygrove Apr 1, 2026
17264d6
docs: document buffered vs immediate native shuffle partitioner modes
andygrove Apr 1, 2026
2b6f774
refactor: revert metrics changes and fix imprecise docs
andygrove Apr 1, 2026
cbd75cf
chore: format markdown table alignment in native_shuffle docs
andygrove Apr 1, 2026
bbb4d7f
docs: design spec for one IPC stream per partition shuffle format
andygrove Apr 2, 2026
38e5285
docs: add validation skip requirement to shuffle stream reader spec
andygrove Apr 2, 2026
26a9032
docs: implementation plan for IPC stream per partition shuffle format
andygrove Apr 2, 2026
bd8b6ec
feat: enable Arrow IPC compression feature for shuffle format
andygrove Apr 2, 2026
16eae59
feat: replace custom shuffle block format with Arrow IPC streams
andygrove Apr 2, 2026
a2ab532
feat: add JniInputStream and ShuffleStreamReader for shuffle read path
andygrove Apr 2, 2026
1777b75
feat: update JVM read side to use streaming shuffle decode API
andygrove Apr 2, 2026
0f33006
fix: resolve clippy warnings and update shuffle_scan tests for new IP…
andygrove Apr 2, 2026
4ac3867
feat: update ShuffleScanExec to use ShuffleStreamReader for Arrow IPC…
andygrove Apr 2, 2026
ef35a42
fix: apply spotless formatting
andygrove Apr 2, 2026
63545ac
fix: handle empty streams and concatenated IPC streams in shuffle reader
andygrove Apr 2, 2026
a1c76b2
refactor: clean up shuffle format migration dead code and review find…
andygrove Apr 2, 2026
bca76af
chore: remove unrelated files accidentally committed
andygrove Apr 2, 2026
c264d31
format
andygrove Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,11 @@ object CometConf extends ShimCometConf {
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.category(CATEGORY_SHUFFLE)
.doc(
"The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and " +
"snappy are supported. Compression can be disabled by setting " +
"The codec of Comet native shuffle used to compress shuffle data. " +
"Supported codecs: lz4, zstd. Compression can be disabled by setting " +
"spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd", "lz4", "snappy"))
.checkValues(Set("zstd", "lz4"))
.createWithDefault("lz4")

val COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
Expand Down Expand Up @@ -523,6 +523,18 @@ object CometConf extends ShimCometConf {
"Should not be larger than batch size `spark.comet.batchSize`")
.createWithDefault(8192)

val COMET_SHUFFLE_PARTITIONER_MODE: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.partitionerMode")
.category(CATEGORY_SHUFFLE)
.doc(
"The partitioner mode used by the native shuffle writer. " +
"'immediate' writes partitioned IPC blocks immediately as batches arrive, " +
"reducing memory usage. 'buffered' buffers all rows before writing, which may " +
"improve performance for small datasets but uses more memory.")
.stringConf
.checkValues(Set("immediate", "buffered"))
.createWithDefault("immediate")

val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize")
.category(CATEGORY_SHUFFLE)
Expand Down
98 changes: 70 additions & 28 deletions docs/source/contributor-guide/native_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,18 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
└─────────────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌───────────────────────────────────┐ ┌───────────────────────────────────┐
│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │
│ (hash/range partitioning) │ │ (single partition case) │
└───────────────────────────────────┘ └───────────────────────────────────┘
┌───────────────────────────────────────────────────────────────────────┐
│ Partitioner Selection │
│ Controlled by spark.comet.exec.shuffle.partitionerMode │
├───────────────────────────┬───────────────────────────────────────────┤
│ immediate (default) │ buffered │
│ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │
│ (hash/range/round-robin) │ (hash/range/round-robin) │
│ Writes IPC blocks as │ Buffers all rows in memory │
│ batches arrive │ before writing │
├───────────────────────────┴───────────────────────────────────────────┤
│ SinglePartitionShufflePartitioner (single partition case) │
└───────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────┐
Expand Down Expand Up @@ -113,11 +121,13 @@ Native shuffle (`CometExchange`) is selected when all of the following condition

### Rust Side

| File | Location | Description |
| ----------------------- | ------------------------------------ | ------------------------------------------------------------------------------------ |
| `shuffle_writer.rs` | `native/core/src/execution/shuffle/` | `ShuffleWriterExec` plan and partitioners. Main shuffle logic. |
| `codec.rs` | `native/core/src/execution/shuffle/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. |
| `comet_partitioning.rs` | `native/core/src/execution/shuffle/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). |
| File | Location | Description |
| ----------------------- | ---------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| `shuffle_writer.rs` | `native/shuffle/src/` | `ShuffleWriterExec` plan. Selects partitioner based on `immediate_mode` flag. |
| `immediate_mode.rs` | `native/shuffle/src/partitioners/` | `ImmediateModePartitioner`. Scatter-writes rows into per-partition Arrow builders and flushes IPC blocks to in-memory buffers eagerly. |
| `multi_partition.rs` | `native/shuffle/src/partitioners/` | `MultiPartitionShuffleRepartitioner`. Buffers all rows in memory, then writes partitions. |
| `codec.rs` | `native/shuffle/src/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. |
| `comet_partitioning.rs` | `native/shuffle/src/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). |

## Data Flow

Expand All @@ -129,23 +139,33 @@ Native shuffle (`CometExchange`) is selected when all of the following condition

2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust.

3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner:
- `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning
- `SinglePartitionShufflePartitioner`: For single partition (simpler path)
3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner
based on the `partitionerMode` configuration:
- **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning.
As each batch arrives, rows are scattered into per-partition Arrow array builders. When a
partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC
block to an in-memory buffer. Under memory pressure, these buffers are spilled to
per-partition temporary files. This keeps memory usage much lower than buffered mode since
data is encoded into compact IPC format eagerly rather than held as raw Arrow arrays.

4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure
exceeds the threshold, partitions spill to temporary files.
- **Buffered mode** (`MultiPartitionShuffleRepartitioner`): For hash/range/round-robin
partitioning. Buffers all input `RecordBatch`es in memory, then partitions and writes
them in a single pass. When memory pressure exceeds the threshold, partitions spill to
temporary files.

5. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC:
- `SinglePartitionShufflePartitioner`: For single partition (simpler path, used regardless
of partitioner mode).

4. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC:
- Writes compression type header
- Writes field count header
- Writes compressed IPC stream

6. **Output files**: Two files are produced:
5. **Output files**: Two files are produced:
- **Data file**: Concatenated partition data
- **Index file**: Array of 8-byte little-endian offsets marking partition boundaries

7. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition
6. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition
lengths and commits via Spark's `IndexShuffleBlockResolver`.

### Read Path
Expand Down Expand Up @@ -201,10 +221,31 @@ sizes.

## Memory Management

Native shuffle uses DataFusion's memory management with spilling support:
Native shuffle uses DataFusion's memory management. The memory characteristics differ
between the two partitioner modes:

### Immediate Mode

Immediate mode keeps memory usage low by partitioning and encoding data eagerly as it arrives,
rather than buffering all input rows before writing:

- **Per-partition builders**: Each partition has a set of Arrow array builders sized to the
target batch size. When a builder fills up, it is flushed as a compressed IPC block to an
in-memory buffer.
- **Memory footprint**: Proportional to `num_partitions × batch_size` for the builders, plus
the accumulated IPC buffers. This is typically much smaller than buffered mode since IPC
encoding is more compact than raw Arrow arrays.
- **Spilling**: When memory pressure is detected via DataFusion's `MemoryConsumer` trait,
partition builders are flushed and all IPC buffers are drained to per-partition temporary
files on disk.

### Buffered Mode

Buffered mode holds all input data in memory before writing:

- **Memory pool**: Tracks memory usage across the shuffle operation.
- **Spill threshold**: When buffered data exceeds the threshold, partitions spill to disk.
- **Buffered batches**: All incoming `RecordBatch`es are accumulated in a `Vec`.
- **Spill threshold**: When buffered data exceeds the memory threshold, partitions spill to
temporary files on disk.
- **Per-partition spilling**: Each partition has its own spill file. Multiple spills for a
partition are concatenated when writing the final output.
- **Scratch space**: Reusable buffers for partition ID computation to reduce allocations.
Expand Down Expand Up @@ -232,14 +273,15 @@ independently compressed, allowing parallel decompression during reads.

## Configuration

| Config | Default | Description |
| ------------------------------------------------- | ------- | ---------------------------------------- |
| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle |
| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` |
| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec |
| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level |
| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size |
| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch |
| Config | Default | Description |
| ------------------------------------------------- | ----------- | ------------------------------------------- |
| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle |
| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` |
| `spark.comet.exec.shuffle.partitionerMode` | `immediate` | Partitioner mode: `immediate` or `buffered` |
| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec |
| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level |
| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size |
| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch |

## Comparison with JVM Shuffle

Expand Down
11 changes: 11 additions & 0 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ Comet provides a fully native shuffle implementation, which generally provides t
supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning` but currently only supports primitive type
partitioning keys. Columns that are not partitioning keys may contain complex types like maps, structs, and arrays.

Native shuffle has two partitioner modes, configured via
`spark.comet.exec.shuffle.partitionerMode`:

- **`immediate`** (default): Writes partitioned Arrow IPC blocks to disk immediately as each batch
arrives. This mode uses less memory because it does not need to buffer the entire input before
writing. It is recommended for most workloads, especially large datasets.

- **`buffered`**: Buffers all input rows in memory before partitioning and writing to disk. This
may improve performance for small datasets that fit in memory, but uses significantly more
memory.

#### Columnar (JVM) Shuffle

Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and
Expand Down
102 changes: 88 additions & 14 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading