From 953919c29d91604c11fe05b9212f8de019fd4e5f Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 31 Mar 2026 08:21:01 -0700 Subject: [PATCH 1/9] chore: `native_datafusion` fails on repartition + count --- .../comet/exec/CometNativeShuffleSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index efb5fbca8a..d955af89f1 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -474,4 +474,24 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } } + + test("native datafusion scan - repartition count") { + withTempPath { dir => + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + spark + .range(1000) + .selectExpr("id", "concat('name_', id) as name") + .repartition(100) + .write + .parquet(dir.toString) + } + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") { + val count = spark.read.parquet(dir.toString).repartition(10).count() + checkSparkAnswerAndOperator(spark.read.parquet(dir.toString).repartition(10)) + assert(count == 1000) + } + } + } } From 3a3b715243f068279dcb2bd98ab4b71492e1e999 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 31 Mar 2026 11:21:46 -0700 Subject: [PATCH 2/9] chore: `native_datafusion` fails on repartition + count --- .../partitioned_batch_iterator.rs | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 8309a8ed4a..40d61b3c21 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -97,15 +97,25 @@ impl Iterator for PartitionedBatchIterator<'_> { let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; - match interleave_record_batch(&self.record_batches, indices) { - Ok(batch) => { - self.pos = indices_end; - Some(Ok(batch)) - } - Err(e) => Some(Err(DataFusionError::ArrowError( - Box::from(e), - Some(DataFusionError::get_back_trace()), - ))), - } + + // record_batches is guaranteed non-empty when indices is non-empty + // (indices reference rows within the buffered batches) + let schema = self.record_batches[0].schema(); + + let result = if schema.fields().is_empty() { + // For zero-column batches (e.g. COUNT queries), we can't use + // interleave_record_batch because Arrow requires either at least one + // column or an explicit row count. Create the batch directly. + let options = + arrow::array::RecordBatchOptions::new().with_row_count(Some(indices.len())); + RecordBatch::try_new_with_options(schema, vec![], &options) + } else { + interleave_record_batch(&self.record_batches, indices) + }; + + self.pos = indices_end; + Some(result.map_err(|e| { + DataFusionError::ArrowError(Box::from(e), Some(DataFusionError::get_back_trace())) + })) } } From c2a8f27b57261b8e2f4051c17881258c2982a970 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 31 Mar 2026 12:31:47 -0700 Subject: [PATCH 3/9] fix branch predictability --- .../shuffle/src/partitioners/partitioned_batch_iterator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 40d61b3c21..ef0e6f425d 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -102,15 +102,15 @@ impl Iterator for PartitionedBatchIterator<'_> { // (indices reference rows within the buffered batches) let schema = self.record_batches[0].schema(); - let result = if schema.fields().is_empty() { + let result = if !schema.fields.is_empty() { + interleave_record_batch(&self.record_batches, indices) + } else { // For zero-column batches (e.g. COUNT queries), we can't use // interleave_record_batch because Arrow requires either at least one // column or an explicit row count. Create the batch directly. let options = arrow::array::RecordBatchOptions::new().with_row_count(Some(indices.len())); RecordBatch::try_new_with_options(schema, vec![], &options) - } else { - interleave_record_batch(&self.record_batches, indices) }; self.pos = indices_end; From dadbec01b5d34a950a61b259e4441e497732ef12 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 31 Mar 2026 16:48:17 -0700 Subject: [PATCH 4/9] chore: `native_datafusion` fails on repartition + count --- .../org/apache/comet/exec/CometNativeShuffleSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index d955af89f1..d39fa8ac00 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -488,9 +488,12 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") { - val count = spark.read.parquet(dir.toString).repartition(10).count() - checkSparkAnswerAndOperator(spark.read.parquet(dir.toString).repartition(10)) + val testDF = spark.read.parquet(dir.toString).repartition(10) + // Actual validation, no crash + val count = testDF.count() assert(count == 1000) + // Ensure test df evaluated by Comet + checkSparkAnswerAndOperator(testDF) } } } From 5120fd99b0229ec73a68c46f09485d4ff613543a Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 1 Apr 2026 19:20:36 -0700 Subject: [PATCH 5/9] short circuit --- .../src/partitioners/multi_partition.rs | 91 ++++++++++++++++++- .../partitioned_batch_iterator.rs | 30 ++---- 2 files changed, 100 insertions(+), 21 deletions(-) diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 655bee3511..fd05effd09 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -22,7 +22,7 @@ use crate::partitioners::partitioned_batch_iterator::{ use crate::partitioners::ShufflePartitioner; use crate::writers::{BufBatchWriter, PartitionWriter}; use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; -use arrow::array::{ArrayRef, RecordBatch}; +use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::common::DataFusionError; @@ -106,6 +106,7 @@ impl ScratchSpace { /// A partitioner that uses a hash function to partition data into multiple partitions pub(crate) struct MultiPartitionShuffleRepartitioner { + schema: SchemaRef, output_data_file: String, output_index_file: String, buffered_batches: Vec, @@ -125,6 +126,10 @@ pub(crate) struct MultiPartitionShuffleRepartitioner { tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, + /// Total accumulated row count for zero-column schemas (e.g. COUNT queries). + /// When Some, partitioning_batch() skips hashing/expression evaluation and just + /// accumulates the total. shuffle_write() distributes rows evenly across partitions. + zero_col_total_rows: Option, } impl MultiPartitionShuffleRepartitioner { @@ -175,7 +180,14 @@ impl MultiPartitionShuffleRepartitioner { .with_can_spill(true) .register(&runtime.memory_pool); + let zero_col_total_rows = if schema.fields().is_empty() { + Some(0usize) + } else { + None + }; + Ok(Self { + schema, output_data_file, output_index_file, buffered_batches: vec![], @@ -190,6 +202,7 @@ impl MultiPartitionShuffleRepartitioner { reservation, tracing_enabled, write_buffer_size, + zero_col_total_rows, }) } @@ -203,6 +216,15 @@ impl MultiPartitionShuffleRepartitioner { return Ok(()); } + // For zero-column schemas (e.g. COUNT queries), skip all hashing/expression + // evaluation and just accumulate total rows. Distribution happens at write time. + if let Some(total) = &mut self.zero_col_total_rows { + let num_rows = input.num_rows(); + *total += num_rows; + self.metrics.baseline.record_output(num_rows); + return Ok(()); + } + if input.num_rows() > self.batch_size { return Err(DataFusionError::Internal( "Input batch size exceeds configured batch size. Call `insert_batch` instead." @@ -523,6 +545,69 @@ impl MultiPartitionShuffleRepartitioner { }) } + /// Fast path for zero-column schemas: distribute total_rows evenly across partitions + /// and emit one RecordBatch per partition carrying the row count. + /// Skips interleave, coalesce, and spill entirely. + fn shuffle_write_zero_col( + &self, + total_rows: usize, + start_time: Instant, + ) -> datafusion::common::Result<()> { + let num_output_partitions = self.partition_writers.len(); + let mut offsets = vec![0u64; num_output_partitions + 1]; + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.output_data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data); + + // Distribute rows evenly: each partition gets total/N, first (total%N) get one extra + let base = total_rows / num_output_partitions; + let remainder = total_rows % num_output_partitions; + + for i in 0..num_output_partitions { + offsets[i] = output_data.stream_position()?; + let row_count = base + if i < remainder { 1 } else { 0 }; + if row_count > 0 { + let options = RecordBatchOptions::new().with_row_count(Some(row_count)); + let batch = + RecordBatch::try_new_with_options(Arc::clone(&self.schema), vec![], &options)?; + self.shuffle_block_writer.write_batch( + &batch, + &mut output_data, + &self.metrics.encode_time, + )?; + } + } + + let mut write_timer = self.metrics.write_time.timer(); + output_data.flush()?; + write_timer.stop(); + + offsets[num_output_partitions] = output_data.stream_position()?; + + let mut write_timer = self.metrics.write_time.timer(); + let mut output_index = BufWriter::new( + File::create(&self.output_index_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, + ); + for offset in offsets { + output_index.write_all(&(offset as i64).to_le_bytes()[..])?; + } + output_index.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + + Ok(()) + } + #[cfg(test)] pub(crate) fn partition_writers(&self) -> &[PartitionWriter] { &self.partition_writers @@ -559,6 +644,10 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { with_trace("shuffle_write", self.tracing_enabled, || { let start_time = Instant::now(); + if let Some(total_rows) = self.zero_col_total_rows.take() { + return self.shuffle_write_zero_col(total_rows, start_time); + } + let mut partitioned_batches = self.partitioned_batches(); let num_output_partitions = self.partition_indices.len(); let mut offsets = vec![0; num_output_partitions + 1]; diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index ef0e6f425d..8309a8ed4a 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -97,25 +97,15 @@ impl Iterator for PartitionedBatchIterator<'_> { let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; - - // record_batches is guaranteed non-empty when indices is non-empty - // (indices reference rows within the buffered batches) - let schema = self.record_batches[0].schema(); - - let result = if !schema.fields.is_empty() { - interleave_record_batch(&self.record_batches, indices) - } else { - // For zero-column batches (e.g. COUNT queries), we can't use - // interleave_record_batch because Arrow requires either at least one - // column or an explicit row count. Create the batch directly. - let options = - arrow::array::RecordBatchOptions::new().with_row_count(Some(indices.len())); - RecordBatch::try_new_with_options(schema, vec![], &options) - }; - - self.pos = indices_end; - Some(result.map_err(|e| { - DataFusionError::ArrowError(Box::from(e), Some(DataFusionError::get_back_trace())) - })) + match interleave_record_batch(&self.record_batches, indices) { + Ok(batch) => { + self.pos = indices_end; + Some(Ok(batch)) + } + Err(e) => Some(Err(DataFusionError::ArrowError( + Box::from(e), + Some(DataFusionError::get_back_trace()), + ))), + } } } From dbe2bf64a5d02015686a116d9ddb98b9d9612526 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 1 Apr 2026 19:33:15 -0700 Subject: [PATCH 6/9] chore: `native_datafusion` fails on repartition + count --- native/shuffle/src/partitioners/multi_partition.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index fd05effd09..1563eebd4e 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -568,8 +568,8 @@ impl MultiPartitionShuffleRepartitioner { let base = total_rows / num_output_partitions; let remainder = total_rows % num_output_partitions; - for i in 0..num_output_partitions { - offsets[i] = output_data.stream_position()?; + for (i, offset) in offsets[..num_output_partitions].iter_mut().enumerate() { + *offset = output_data.stream_position()?; let row_count = base + if i < remainder { 1 } else { 0 }; if row_count > 0 { let options = RecordBatchOptions::new().with_row_count(Some(row_count)); From 88a3ecef1f78b442a3af42e970d4cf3bc25fdb5e Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 2 Apr 2026 10:26:32 -0700 Subject: [PATCH 7/9] chore: `native_datafusion` fails on repartition + count --- .../src/partitioners/multi_partition.rs | 111 ++++-------------- .../partitioned_batch_iterator.rs | 27 +++-- .../comet/exec/CometNativeShuffleSuite.scala | 7 ++ 3 files changed, 49 insertions(+), 96 deletions(-) diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 1563eebd4e..6e1af38f8e 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -22,7 +22,7 @@ use crate::partitioners::partitioned_batch_iterator::{ use crate::partitioners::ShufflePartitioner; use crate::writers::{BufBatchWriter, PartitionWriter}; use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; -use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; +use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::common::DataFusionError; @@ -106,7 +106,6 @@ impl ScratchSpace { /// A partitioner that uses a hash function to partition data into multiple partitions pub(crate) struct MultiPartitionShuffleRepartitioner { - schema: SchemaRef, output_data_file: String, output_index_file: String, buffered_batches: Vec, @@ -126,10 +125,6 @@ pub(crate) struct MultiPartitionShuffleRepartitioner { tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, - /// Total accumulated row count for zero-column schemas (e.g. COUNT queries). - /// When Some, partitioning_batch() skips hashing/expression evaluation and just - /// accumulates the total. shuffle_write() distributes rows evenly across partitions. - zero_col_total_rows: Option, } impl MultiPartitionShuffleRepartitioner { @@ -180,14 +175,7 @@ impl MultiPartitionShuffleRepartitioner { .with_can_spill(true) .register(&runtime.memory_pool); - let zero_col_total_rows = if schema.fields().is_empty() { - Some(0usize) - } else { - None - }; - Ok(Self { - schema, output_data_file, output_index_file, buffered_batches: vec![], @@ -202,7 +190,6 @@ impl MultiPartitionShuffleRepartitioner { reservation, tracing_enabled, write_buffer_size, - zero_col_total_rows, }) } @@ -216,12 +203,33 @@ impl MultiPartitionShuffleRepartitioner { return Ok(()); } - // For zero-column schemas (e.g. COUNT queries), skip all hashing/expression - // evaluation and just accumulate total rows. Distribution happens at write time. - if let Some(total) = &mut self.zero_col_total_rows { + // For zero-column schemas (e.g. COUNT queries), assign all rows to partition 0. + // No hashing or expression evaluation needed — just route through normal buffering. + if input.num_columns() == 0 { let num_rows = input.num_rows(); - *total += num_rows; self.metrics.baseline.record_output(num_rows); + // All rows go to partition 0: partition_starts = [0, num_rows, num_rows, ...] + // partition_row_indices = [0, 1, 2, ..., num_rows-1] + let mut scratch = std::mem::take(&mut self.scratch); + scratch + .partition_starts + .resize(self.partition_indices.len() + 1, 0); + scratch.partition_starts.fill(num_rows as u32); + scratch.partition_starts[0] = 0; + scratch.partition_row_indices.resize(num_rows, 0); + for (i, v) in scratch.partition_row_indices[..num_rows] + .iter_mut() + .enumerate() + { + *v = i as u32; + } + self.buffer_partitioned_batch_may_spill( + input, + &scratch.partition_row_indices[..num_rows], + &scratch.partition_starts, + ) + .await?; + self.scratch = scratch; return Ok(()); } @@ -545,69 +553,6 @@ impl MultiPartitionShuffleRepartitioner { }) } - /// Fast path for zero-column schemas: distribute total_rows evenly across partitions - /// and emit one RecordBatch per partition carrying the row count. - /// Skips interleave, coalesce, and spill entirely. - fn shuffle_write_zero_col( - &self, - total_rows: usize, - start_time: Instant, - ) -> datafusion::common::Result<()> { - let num_output_partitions = self.partition_writers.len(); - let mut offsets = vec![0u64; num_output_partitions + 1]; - - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data); - - // Distribute rows evenly: each partition gets total/N, first (total%N) get one extra - let base = total_rows / num_output_partitions; - let remainder = total_rows % num_output_partitions; - - for (i, offset) in offsets[..num_output_partitions].iter_mut().enumerate() { - *offset = output_data.stream_position()?; - let row_count = base + if i < remainder { 1 } else { 0 }; - if row_count > 0 { - let options = RecordBatchOptions::new().with_row_count(Some(row_count)); - let batch = - RecordBatch::try_new_with_options(Arc::clone(&self.schema), vec![], &options)?; - self.shuffle_block_writer.write_batch( - &batch, - &mut output_data, - &self.metrics.encode_time, - )?; - } - } - - let mut write_timer = self.metrics.write_time.timer(); - output_data.flush()?; - write_timer.stop(); - - offsets[num_output_partitions] = output_data.stream_position()?; - - let mut write_timer = self.metrics.write_time.timer(); - let mut output_index = BufWriter::new( - File::create(&self.output_index_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, - ); - for offset in offsets { - output_index.write_all(&(offset as i64).to_le_bytes()[..])?; - } - output_index.flush()?; - write_timer.stop(); - - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - - Ok(()) - } - #[cfg(test)] pub(crate) fn partition_writers(&self) -> &[PartitionWriter] { &self.partition_writers @@ -644,10 +589,6 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { with_trace("shuffle_write", self.tracing_enabled, || { let start_time = Instant::now(); - if let Some(total_rows) = self.zero_col_total_rows.take() { - return self.shuffle_write_zero_col(total_rows, start_time); - } - let mut partitioned_batches = self.partitioned_batches(); let num_output_partitions = self.partition_indices.len(); let mut offsets = vec![0; num_output_partitions + 1]; diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 8309a8ed4a..b97b6f6923 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::RecordBatch; +use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::compute::interleave_record_batch; use datafusion::common::DataFusionError; @@ -97,15 +97,20 @@ impl Iterator for PartitionedBatchIterator<'_> { let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; - match interleave_record_batch(&self.record_batches, indices) { - Ok(batch) => { - self.pos = indices_end; - Some(Ok(batch)) - } - Err(e) => Some(Err(DataFusionError::ArrowError( - Box::from(e), - Some(DataFusionError::get_back_trace()), - ))), - } + + // interleave_record_batch requires at least one column or an explicit row count. + // For zero-column batches (e.g. COUNT queries), create the batch directly. + let schema = self.record_batches[0].schema(); + let result = if schema.fields().is_empty() { + let options = RecordBatchOptions::new().with_row_count(Some(indices.len())); + RecordBatch::try_new_with_options(schema, vec![], &options) + } else { + interleave_record_batch(&self.record_batches, indices) + }; + + self.pos = indices_end; + Some(result.map_err(|e| { + DataFusionError::ArrowError(Box::from(e), Some(DataFusionError::get_back_trace())) + })) } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index d39fa8ac00..02bf03fa2c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -489,6 +489,13 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") { val testDF = spark.read.parquet(dir.toString).repartition(10) + // Verify CometShuffleExchangeExec is in the plan + assert( + find(testDF.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.isDefined, + "Expected CometShuffleExchangeExec in the plan") // Actual validation, no crash val count = testDF.count() assert(count == 1000) From 5a04132455a076b3313e8dbcaeea701b33db7830 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 2 Apr 2026 12:29:29 -0700 Subject: [PATCH 8/9] chore: `native_datafusion` fails on repartition + count --- .../src/partitioners/multi_partition.rs | 28 ++++--------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 6e1af38f8e..a7f0888b41 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -204,32 +204,16 @@ impl MultiPartitionShuffleRepartitioner { } // For zero-column schemas (e.g. COUNT queries), assign all rows to partition 0. - // No hashing or expression evaluation needed — just route through normal buffering. if input.num_columns() == 0 { let num_rows = input.num_rows(); self.metrics.baseline.record_output(num_rows); - // All rows go to partition 0: partition_starts = [0, num_rows, num_rows, ...] - // partition_row_indices = [0, 1, 2, ..., num_rows-1] - let mut scratch = std::mem::take(&mut self.scratch); - scratch - .partition_starts - .resize(self.partition_indices.len() + 1, 0); - scratch.partition_starts.fill(num_rows as u32); - scratch.partition_starts[0] = 0; - scratch.partition_row_indices.resize(num_rows, 0); - for (i, v) in scratch.partition_row_indices[..num_rows] - .iter_mut() - .enumerate() - { - *v = i as u32; + let batch_idx = self.buffered_batches.len() as u32; + self.buffered_batches.push(input); + let indices = &mut self.partition_indices[0]; + indices.reserve(num_rows); + for row in 0..num_rows as u32 { + indices.push((batch_idx, row)); } - self.buffer_partitioned_batch_may_spill( - input, - &scratch.partition_row_indices[..num_rows], - &scratch.partition_starts, - ) - .await?; - self.scratch = scratch; return Ok(()); } From f64fe9ec033fdf64cb9dc4a2c6bcf8461cb37a6c Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 2 Apr 2026 14:07:55 -0700 Subject: [PATCH 9/9] chore: `native_datafusion` fails on repartition + count --- native/shuffle/src/partitioners/multi_partition.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index a7f0888b41..c07978337b 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -204,16 +204,15 @@ impl MultiPartitionShuffleRepartitioner { } // For zero-column schemas (e.g. COUNT queries), assign all rows to partition 0. + // The actual index values don't matter — the consumer only uses indices.len() + // as the row count for zero-column batches. if input.num_columns() == 0 { let num_rows = input.num_rows(); self.metrics.baseline.record_output(num_rows); let batch_idx = self.buffered_batches.len() as u32; self.buffered_batches.push(input); let indices = &mut self.partition_indices[0]; - indices.reserve(num_rows); - for row in 0..num_rows as u32 { - indices.push((batch_idx, row)); - } + indices.resize(indices.len() + num_rows, (batch_idx, 0)); return Ok(()); }