From 8d71b6550fdbaf503c5111e26d6f7e69e295bbc9 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 2 Apr 2026 20:26:17 -0400 Subject: [PATCH 1/4] add EmptySchemaShufflePartitioner and test from #3858 --- .../shuffle/src/partitioners/empty_schema.rs | 135 ++++++++++++++++++ native/shuffle/src/partitioners/mod.rs | 2 + native/shuffle/src/shuffle_writer.rs | 101 ++++++++++++- .../comet/exec/CometNativeShuffleSuite.scala | 32 +++++ 4 files changed, 269 insertions(+), 1 deletion(-) create mode 100644 native/shuffle/src/partitioners/empty_schema.rs diff --git a/native/shuffle/src/partitioners/empty_schema.rs b/native/shuffle/src/partitioners/empty_schema.rs new file mode 100644 index 0000000000..45decfec05 --- /dev/null +++ b/native/shuffle/src/partitioners/empty_schema.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::ShufflePartitioner; +use crate::ShuffleBlockWriter; +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use std::fs::OpenOptions; +use std::io::{BufWriter, Seek, Write}; +use tokio::time::Instant; + +/// A partitioner for zero-column schemas (e.g. queries where ColumnPruning removes all columns). +/// This handles shuffles for operations like COUNT(*) that produce empty-schema record batches +/// but contain a valid row count. Accumulates the total row count and writes a single +/// zero-column IPC batch to partition 0. All other partitions get empty entries in the index file. +pub(crate) struct EmptySchemaShufflePartitioner { + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + shuffle_block_writer: ShuffleBlockWriter, + num_output_partitions: usize, + total_rows: usize, + metrics: ShufflePartitionerMetrics, +} + +impl EmptySchemaShufflePartitioner { + pub(crate) fn try_new( + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + num_output_partitions: usize, + metrics: ShufflePartitionerMetrics, + codec: crate::CompressionCodec, + ) -> datafusion::common::Result { + debug_assert!( + schema.fields().is_empty(), + "EmptySchemaShufflePartitioner requires a zero-column schema" + ); + let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec)?; + Ok(Self { + output_data_file, + output_index_file, + schema, + shuffle_block_writer, + num_output_partitions, + total_rows: 0, + metrics, + }) + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for EmptySchemaShufflePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + let num_rows = batch.num_rows(); + if num_rows > 0 { + self.total_rows += num_rows; + self.metrics.baseline.record_output(num_rows); + } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } + + fn shuffle_write(&mut self) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.output_data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + let mut output_data = BufWriter::new(output_data); + + // Write a single zero-column batch with the accumulated row count to partition 0 + if self.total_rows > 0 { + let batch = RecordBatch::try_new_with_options( + self.schema.clone(), + vec![], + &arrow::array::RecordBatchOptions::new().with_row_count(Some(self.total_rows)), + )?; + self.shuffle_block_writer.write_batch( + &batch, + &mut output_data, + &self.metrics.encode_time, + )?; + } + + let mut write_timer = self.metrics.write_time.timer(); + output_data.flush()?; + let data_file_length = output_data.stream_position()?; + + // Write index file: partition 0 spans [0, data_file_length), all others are empty + let index_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.output_index_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + let mut index_writer = BufWriter::new(index_file); + index_writer.write_all(&0i64.to_le_bytes())?; + for _ in 0..self.num_output_partitions { + index_writer.write_all(&(data_file_length as i64).to_le_bytes())?; + } + index_writer.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } +} diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index 3eedef62c7..a0bc652b4b 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +mod empty_schema; mod multi_partition; mod partitioned_batch_iterator; mod single_partition; mod traits; +pub(crate) use empty_schema::EmptySchemaShufflePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index e649aaac69..2ab48c9162 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -19,7 +19,8 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::{ - MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, + EmptySchemaShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, + SinglePartitionShufflePartitioner, }; use crate::{CometPartitioning, CompressionCodec}; use async_trait::async_trait; @@ -226,6 +227,14 @@ async fn external_shuffle( write_buffer_size, )?) } + _ if schema.fields().is_empty() => Box::new(EmptySchemaShufflePartitioner::try_new( + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning.partition_count(), + metrics, + codec, + )?), _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( partition, output_data_file, @@ -693,4 +702,94 @@ mod test { } total_rows } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_empty_schema_shuffle_writer() { + use std::fs; + use std::io::Read; + + let num_rows = 1000; + let num_batches = 5; + let num_partitions = 10; + + let schema = Arc::new(Schema::new(Vec::::new())); + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + vec![], + &arrow::array::RecordBatchOptions::new().with_row_count(Some(num_rows)), + ) + .unwrap(); + + let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); + let partitions = &[batches]; + + let dir = tempfile::tempdir().unwrap(); + let data_file = dir.path().join("data.out"); + let index_file = dir.path().join("index.out"); + + let exec = ShuffleWriterExec::try_new( + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(partitions, Arc::clone(&schema), None).unwrap(), + ))), + CometPartitioning::RoundRobin(num_partitions, 0), + CompressionCodec::Zstd(1), + data_file.to_str().unwrap().to_string(), + index_file.to_str().unwrap().to_string(), + false, + 1024 * 1024, + ) + .unwrap(); + + let config = SessionConfig::new(); + let runtime_env = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + + // Verify data file is non-empty (contains IPC batch with row count) + let mut data = Vec::new(); + fs::File::open(&data_file) + .unwrap() + .read_to_end(&mut data) + .unwrap(); + assert!(!data.is_empty(), "Data file should contain IPC data"); + + // Verify row count survives roundtrip + let total_rows = read_all_ipc_blocks(&data); + assert_eq!( + total_rows, + num_rows * num_batches, + "Row count should survive roundtrip" + ); + + // Verify index file structure: num_partitions + 1 offsets + let mut index_data = Vec::new(); + fs::File::open(&index_file) + .unwrap() + .read_to_end(&mut index_data) + .unwrap(); + let expected_index_size = (num_partitions + 1) * 8; + assert_eq!(index_data.len(), expected_index_size); + + // First offset should be 0 + let first_offset = i64::from_le_bytes(index_data[0..8].try_into().unwrap()); + assert_eq!(first_offset, 0); + + // Second offset should equal data file length (partition 0 holds all data) + let data_len = data.len() as i64; + let second_offset = i64::from_le_bytes(index_data[8..16].try_into().unwrap()); + assert_eq!(second_offset, data_len); + + // All remaining offsets should equal data file length (empty partitions) + for i in 2..=num_partitions { + let offset = i64::from_le_bytes(index_data[i * 8..(i + 1) * 8].try_into().unwrap()); + assert_eq!( + offset, data_len, + "Partition {i} offset should equal data length" + ); + } + } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index ee29710ee2..9a6173192a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -468,4 +468,36 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } } + + // Regression test for https://github.com/apache/datafusion-comet/issues/3846 + test("repartition count") { + withTempPath { dir => + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + spark + .range(1000) + .selectExpr("id", "concat('name_', id) as name") + .repartition(100) + .write + .parquet(dir.toString) + } + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") { + val testDF = spark.read.parquet(dir.toString).repartition(10) + // Verify CometShuffleExchangeExec is in the plan + assert( + find(testDF.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.isDefined, + "Expected CometShuffleExchangeExec in the plan") + // Actual validation, no crash + val count = testDF.count() + assert(count == 1000) + // Ensure test df evaluated by Comet + checkSparkAnswerAndOperator(testDF) + println(testDF.queryExecution.executedPlan) + } + } + } } From c2a6c501910524eb7b057c0ecd20cffe8eae6219 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 2 Apr 2026 20:48:41 -0400 Subject: [PATCH 2/4] reorder match, add info logging --- native/shuffle/src/shuffle_writer.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index 2ab48c9162..c71f6b83e3 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -41,6 +41,7 @@ use datafusion::{ }; use datafusion_comet_common::tracing::with_trace_async; use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use log::info; use std::{ any::Any, fmt, @@ -216,6 +217,17 @@ async fn external_shuffle( let schema = input.schema(); let mut repartitioner: Box = match &partitioning { + _ if schema.fields().is_empty() => { + info!("found empty schema, overriding with EmptySchemaShufflePartitioner"); + Box::new(EmptySchemaShufflePartitioner::try_new( + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning.partition_count(), + metrics, + codec, + )?) + } any if any.partition_count() == 1 => { Box::new(SinglePartitionShufflePartitioner::try_new( output_data_file, @@ -227,14 +239,6 @@ async fn external_shuffle( write_buffer_size, )?) } - _ if schema.fields().is_empty() => Box::new(EmptySchemaShufflePartitioner::try_new( - output_data_file, - output_index_file, - Arc::clone(&schema), - partitioning.partition_count(), - metrics, - codec, - )?), _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( partition, output_data_file, From 3c76d50d751f51cbbc722dd4e253cb0dbb760527 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 2 Apr 2026 20:49:29 -0400 Subject: [PATCH 3/4] remove println from test --- .../scala/org/apache/comet/exec/CometNativeShuffleSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 9a6173192a..751bf4c1f5 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -496,7 +496,6 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper assert(count == 1000) // Ensure test df evaluated by Comet checkSparkAnswerAndOperator(testDF) - println(testDF.queryExecution.executedPlan) } } } From 2c7650b63231ed22e042335e21154931db75a4e1 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 3 Apr 2026 11:32:30 -0400 Subject: [PATCH 4/4] add test with 0 row, reduce logging verbosity to debug --- native/shuffle/src/shuffle_writer.rs | 69 +++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index c71f6b83e3..837d3b72f3 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -41,7 +41,6 @@ use datafusion::{ }; use datafusion_comet_common::tracing::with_trace_async; use futures::{StreamExt, TryFutureExt, TryStreamExt}; -use log::info; use std::{ any::Any, fmt, @@ -218,7 +217,7 @@ async fn external_shuffle( let mut repartitioner: Box = match &partitioning { _ if schema.fields().is_empty() => { - info!("found empty schema, overriding with EmptySchemaShufflePartitioner"); + log::debug!("found empty schema, overriding {partitioning:?} partitioning with EmptySchemaShufflePartitioner"); Box::new(EmptySchemaShufflePartitioner::try_new( output_data_file, output_index_file, @@ -796,4 +795,70 @@ mod test { ); } } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_empty_schema_shuffle_writer_zero_rows() { + use std::fs; + use std::io::Read; + + let num_partitions = 4; + + let schema = Arc::new(Schema::new(Vec::::new())); + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + vec![], + &arrow::array::RecordBatchOptions::new().with_row_count(Some(0)), + ) + .unwrap(); + + let batches = vec![batch]; + let partitions = &[batches]; + + let dir = tempfile::tempdir().unwrap(); + let data_file = dir.path().join("data.out"); + let index_file = dir.path().join("index.out"); + + let exec = ShuffleWriterExec::try_new( + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(partitions, Arc::clone(&schema), None).unwrap(), + ))), + CometPartitioning::RoundRobin(num_partitions, 0), + CompressionCodec::Zstd(1), + data_file.to_str().unwrap().to_string(), + index_file.to_str().unwrap().to_string(), + false, + 1024 * 1024, + ) + .unwrap(); + + let config = SessionConfig::new(); + let runtime_env = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + + // Data file should be empty (no rows to write) + let mut data = Vec::new(); + fs::File::open(&data_file) + .unwrap() + .read_to_end(&mut data) + .unwrap(); + assert!(data.is_empty(), "Data file should be empty with zero rows"); + + // Index file should have all-zero offsets + let mut index_data = Vec::new(); + fs::File::open(&index_file) + .unwrap() + .read_to_end(&mut index_data) + .unwrap(); + let expected_index_size = (num_partitions + 1) * 8; + assert_eq!(index_data.len(), expected_index_size); + for i in 0..=num_partitions { + let offset = i64::from_le_bytes(index_data[i * 8..(i + 1) * 8].try_into().unwrap()); + assert_eq!(offset, 0, "All offsets should be 0 with zero rows"); + } + } }