-
Notifications
You must be signed in to change notification settings - Fork 297
feat: Add immediate mode option for native shuffle #3845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ae0741b
9b5b305
e1ab490
b7682f4
ca36cbd
7225afd
6e8bed2
16ce30f
2ef57e7
9136e10
7e16819
22fe804
58ab927
c469077
4ed08c5
b0d4ab2
56574d5
cdbbdee
4bcb014
f2e459e
9ccdda7
cf56e39
f9139d7
76815a9
42289ee
27484ce
033cda9
aa49a72
4509985
e45d733
32194bf
7ea12ac
d06bfdb
e66a6f3
17264d6
2b6f774
cbd75cf
51aed0e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,10 +81,18 @@ Native shuffle (`CometExchange`) is selected when all of the following condition | |
| └─────────────────────────────────────────────────────────────────────────────┘ | ||
| │ │ | ||
| ▼ ▼ | ||
| ┌───────────────────────────────────┐ ┌───────────────────────────────────┐ | ||
| │ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │ | ||
| │ (hash/range partitioning) │ │ (single partition case) │ | ||
| └───────────────────────────────────┘ └───────────────────────────────────┘ | ||
| ┌───────────────────────────────────────────────────────────────────────┐ | ||
| │ Partitioner Selection │ | ||
| │ Controlled by spark.comet.exec.shuffle.partitionerMode │ | ||
| ├───────────────────────────┬───────────────────────────────────────────┤ | ||
| │ immediate (default) │ buffered │ | ||
| │ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │ | ||
| │ (hash/range/round-robin) │ (hash/range/round-robin) │ | ||
| │ Writes IPC blocks as │ Buffers all rows in memory │ | ||
| │ batches arrive │ before writing │ | ||
| ├───────────────────────────┴───────────────────────────────────────────┤ | ||
| │ SinglePartitionShufflePartitioner (single partition case) │ | ||
| └───────────────────────────────────────────────────────────────────────┘ | ||
| │ | ||
| ▼ | ||
| ┌───────────────────────────────────┐ | ||
|
|
@@ -113,11 +121,13 @@ Native shuffle (`CometExchange`) is selected when all of the following condition | |
|
|
||
| ### Rust Side | ||
|
|
||
| | File | Location | Description | | ||
| | ----------------------- | ------------------------------------ | ------------------------------------------------------------------------------------ | | ||
| | `shuffle_writer.rs` | `native/core/src/execution/shuffle/` | `ShuffleWriterExec` plan and partitioners. Main shuffle logic. | | ||
| | `codec.rs` | `native/core/src/execution/shuffle/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. | | ||
| | `comet_partitioning.rs` | `native/core/src/execution/shuffle/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). | | ||
| | File | Location | Description | | ||
| | ----------------------- | ---------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- | | ||
| | `shuffle_writer.rs` | `native/shuffle/src/` | `ShuffleWriterExec` plan. Selects partitioner based on `immediate_mode` flag. | | ||
| | `immediate_mode.rs` | `native/shuffle/src/partitioners/` | `ImmediateModePartitioner`. Scatter-writes rows into per-partition Arrow builders and flushes IPC blocks to in-memory buffers eagerly. | | ||
| | `multi_partition.rs` | `native/shuffle/src/partitioners/` | `MultiPartitionShuffleRepartitioner`. Buffers all rows in memory, then writes partitions. | | ||
| | `codec.rs` | `native/shuffle/src/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. | | ||
| | `comet_partitioning.rs` | `native/shuffle/src/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). | | ||
|
|
||
| ## Data Flow | ||
|
|
||
|
|
@@ -129,23 +139,33 @@ Native shuffle (`CometExchange`) is selected when all of the following condition | |
|
|
||
| 2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust. | ||
|
|
||
| 3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner: | ||
| - `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning | ||
| - `SinglePartitionShufflePartitioner`: For single partition (simpler path) | ||
| 3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner | ||
| based on the `partitionerMode` configuration: | ||
| - **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning. | ||
| As each batch arrives, rows are scattered into per-partition Arrow array builders. When a | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The current IPC writer uses block compression (compressing each batch), which may lead to poor compression ratios. In gluten, the shuffle writer first serializes and buffers the batches, then performs streming compression during eviction, achieving better compression ratios. I'm not entirely sure which is better. |
||
| partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC | ||
| block to an in-memory buffer. Under memory pressure, these buffers are spilled to | ||
| per-partition temporary files. This keeps memory usage much lower than buffered mode since | ||
| data is encoded into compact IPC format eagerly rather than held as raw Arrow arrays. | ||
|
|
||
| 4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure | ||
| exceeds the threshold, partitions spill to temporary files. | ||
| - **Buffered mode** (`MultiPartitionShuffleRepartitioner`): For hash/range/round-robin | ||
| partitioning. Buffers all input `RecordBatch`es in memory, then partitions and writes | ||
| them in a single pass. When memory pressure exceeds the threshold, partitions spill to | ||
| temporary files. | ||
|
|
||
| 5. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC: | ||
| - `SinglePartitionShufflePartitioner`: For single partition (simpler path, used regardless | ||
| of partitioner mode). | ||
|
|
||
| 4. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC: | ||
| - Writes compression type header | ||
| - Writes field count header | ||
| - Writes compressed IPC stream | ||
|
|
||
| 6. **Output files**: Two files are produced: | ||
| 5. **Output files**: Two files are produced: | ||
| - **Data file**: Concatenated partition data | ||
| - **Index file**: Array of 8-byte little-endian offsets marking partition boundaries | ||
|
|
||
| 7. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition | ||
| 6. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition | ||
| lengths and commits via Spark's `IndexShuffleBlockResolver`. | ||
|
|
||
| ### Read Path | ||
|
|
@@ -201,10 +221,31 @@ sizes. | |
|
|
||
| ## Memory Management | ||
|
|
||
| Native shuffle uses DataFusion's memory management with spilling support: | ||
| Native shuffle uses DataFusion's memory management. The memory characteristics differ | ||
| between the two partitioner modes: | ||
|
|
||
| ### Immediate Mode | ||
|
|
||
| Immediate mode keeps memory usage low by partitioning and encoding data eagerly as it arrives, | ||
| rather than buffering all input rows before writing: | ||
|
|
||
| - **Per-partition builders**: Each partition has a set of Arrow array builders sized to the | ||
| target batch size. When a builder fills up, it is flushed as a compressed IPC block to an | ||
| in-memory buffer. | ||
| - **Memory footprint**: Proportional to `num_partitions × batch_size` for the builders, plus | ||
| the accumulated IPC buffers. This is typically much smaller than buffered mode since IPC | ||
| encoding is more compact than raw Arrow arrays. | ||
| - **Spilling**: When memory pressure is detected via DataFusion's `MemoryConsumer` trait, | ||
| partition builders are flushed and all IPC buffers are drained to per-partition temporary | ||
| files on disk. | ||
|
|
||
| ### Buffered Mode | ||
|
|
||
| Buffered mode holds all input data in memory before writing: | ||
|
|
||
| - **Memory pool**: Tracks memory usage across the shuffle operation. | ||
| - **Spill threshold**: When buffered data exceeds the threshold, partitions spill to disk. | ||
| - **Buffered batches**: All incoming `RecordBatch`es are accumulated in a `Vec`. | ||
| - **Spill threshold**: When buffered data exceeds the memory threshold, partitions spill to | ||
| temporary files on disk. | ||
| - **Per-partition spilling**: Each partition has its own spill file. Multiple spills for a | ||
| partition are concatenated when writing the final output. | ||
| - **Scratch space**: Reusable buffers for partition ID computation to reduce allocations. | ||
|
|
@@ -232,14 +273,15 @@ independently compressed, allowing parallel decompression during reads. | |
|
|
||
| ## Configuration | ||
|
|
||
| | Config | Default | Description | | ||
| | ------------------------------------------------- | ------- | ---------------------------------------- | | ||
| | `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle | | ||
| | `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` | | ||
| | `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec | | ||
| | `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level | | ||
| | `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size | | ||
| | `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch | | ||
| | Config | Default | Description | | ||
| | ------------------------------------------------- | ----------- | ------------------------------------------- | | ||
| | `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle | | ||
| | `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` | | ||
| | `spark.comet.exec.shuffle.partitionerMode` | `immediate` | Partitioner mode: `immediate` or `buffered` | | ||
| | `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec | | ||
| | `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level | | ||
| | `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size | | ||
| | `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch | | ||
|
|
||
| ## Comparison with JVM Shuffle | ||
|
|
||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.