diff --git a/native/shuffle/src/partitioners/empty_schema.rs b/native/shuffle/src/partitioners/empty_schema.rs new file mode 100644 index 0000000000..45decfec05 --- /dev/null +++ b/native/shuffle/src/partitioners/empty_schema.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::ShufflePartitioner; +use crate::ShuffleBlockWriter; +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use std::fs::OpenOptions; +use std::io::{BufWriter, Seek, Write}; +use tokio::time::Instant; + +/// A partitioner for zero-column schemas (e.g. queries where ColumnPruning removes all columns). +/// This handles shuffles for operations like COUNT(*) that produce empty-schema record batches +/// but contain a valid row count. Accumulates the total row count and writes a single +/// zero-column IPC batch to partition 0. All other partitions get empty entries in the index file. +pub(crate) struct EmptySchemaShufflePartitioner { + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + shuffle_block_writer: ShuffleBlockWriter, + num_output_partitions: usize, + total_rows: usize, + metrics: ShufflePartitionerMetrics, +} + +impl EmptySchemaShufflePartitioner { + pub(crate) fn try_new( + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + num_output_partitions: usize, + metrics: ShufflePartitionerMetrics, + codec: crate::CompressionCodec, + ) -> datafusion::common::Result { + debug_assert!( + schema.fields().is_empty(), + "EmptySchemaShufflePartitioner requires a zero-column schema" + ); + let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec)?; + Ok(Self { + output_data_file, + output_index_file, + schema, + shuffle_block_writer, + num_output_partitions, + total_rows: 0, + metrics, + }) + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for EmptySchemaShufflePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + let num_rows = batch.num_rows(); + if num_rows > 0 { + self.total_rows += num_rows; + self.metrics.baseline.record_output(num_rows); + } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } + + fn shuffle_write(&mut self) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.output_data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + let mut output_data = BufWriter::new(output_data); + + // Write a single zero-column batch with the accumulated row count to partition 0 + if self.total_rows > 0 { + let batch = RecordBatch::try_new_with_options( + self.schema.clone(), + vec![], + &arrow::array::RecordBatchOptions::new().with_row_count(Some(self.total_rows)), + )?; + self.shuffle_block_writer.write_batch( + &batch, + &mut output_data, + &self.metrics.encode_time, + )?; + } + + let mut write_timer = self.metrics.write_time.timer(); + output_data.flush()?; + let data_file_length = output_data.stream_position()?; + + // Write index file: partition 0 spans [0, data_file_length), all others are empty + let index_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.output_index_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + let mut index_writer = BufWriter::new(index_file); + index_writer.write_all(&0i64.to_le_bytes())?; + for _ in 0..self.num_output_partitions { + index_writer.write_all(&(data_file_length as i64).to_le_bytes())?; + } + index_writer.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } +} diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index 3eedef62c7..a0bc652b4b 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +mod empty_schema; mod multi_partition; mod partitioned_batch_iterator; mod single_partition; mod traits; +pub(crate) use empty_schema::EmptySchemaShufflePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index e649aaac69..837d3b72f3 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -19,7 +19,8 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::{ - MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, + EmptySchemaShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, + SinglePartitionShufflePartitioner, }; use crate::{CometPartitioning, CompressionCodec}; use async_trait::async_trait; @@ -215,6 +216,17 @@ async fn external_shuffle( let schema = input.schema(); let mut repartitioner: Box = match &partitioning { + _ if schema.fields().is_empty() => { + log::debug!("found empty schema, overriding {partitioning:?} partitioning with EmptySchemaShufflePartitioner"); + Box::new(EmptySchemaShufflePartitioner::try_new( + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning.partition_count(), + metrics, + codec, + )?) + } any if any.partition_count() == 1 => { Box::new(SinglePartitionShufflePartitioner::try_new( output_data_file, @@ -693,4 +705,160 @@ mod test { } total_rows } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_empty_schema_shuffle_writer() { + use std::fs; + use std::io::Read; + + let num_rows = 1000; + let num_batches = 5; + let num_partitions = 10; + + let schema = Arc::new(Schema::new(Vec::::new())); + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + vec![], + &arrow::array::RecordBatchOptions::new().with_row_count(Some(num_rows)), + ) + .unwrap(); + + let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); + let partitions = &[batches]; + + let dir = tempfile::tempdir().unwrap(); + let data_file = dir.path().join("data.out"); + let index_file = dir.path().join("index.out"); + + let exec = ShuffleWriterExec::try_new( + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(partitions, Arc::clone(&schema), None).unwrap(), + ))), + CometPartitioning::RoundRobin(num_partitions, 0), + CompressionCodec::Zstd(1), + data_file.to_str().unwrap().to_string(), + index_file.to_str().unwrap().to_string(), + false, + 1024 * 1024, + ) + .unwrap(); + + let config = SessionConfig::new(); + let runtime_env = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + + // Verify data file is non-empty (contains IPC batch with row count) + let mut data = Vec::new(); + fs::File::open(&data_file) + .unwrap() + .read_to_end(&mut data) + .unwrap(); + assert!(!data.is_empty(), "Data file should contain IPC data"); + + // Verify row count survives roundtrip + let total_rows = read_all_ipc_blocks(&data); + assert_eq!( + total_rows, + num_rows * num_batches, + "Row count should survive roundtrip" + ); + + // Verify index file structure: num_partitions + 1 offsets + let mut index_data = Vec::new(); + fs::File::open(&index_file) + .unwrap() + .read_to_end(&mut index_data) + .unwrap(); + let expected_index_size = (num_partitions + 1) * 8; + assert_eq!(index_data.len(), expected_index_size); + + // First offset should be 0 + let first_offset = i64::from_le_bytes(index_data[0..8].try_into().unwrap()); + assert_eq!(first_offset, 0); + + // Second offset should equal data file length (partition 0 holds all data) + let data_len = data.len() as i64; + let second_offset = i64::from_le_bytes(index_data[8..16].try_into().unwrap()); + assert_eq!(second_offset, data_len); + + // All remaining offsets should equal data file length (empty partitions) + for i in 2..=num_partitions { + let offset = i64::from_le_bytes(index_data[i * 8..(i + 1) * 8].try_into().unwrap()); + assert_eq!( + offset, data_len, + "Partition {i} offset should equal data length" + ); + } + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_empty_schema_shuffle_writer_zero_rows() { + use std::fs; + use std::io::Read; + + let num_partitions = 4; + + let schema = Arc::new(Schema::new(Vec::::new())); + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + vec![], + &arrow::array::RecordBatchOptions::new().with_row_count(Some(0)), + ) + .unwrap(); + + let batches = vec![batch]; + let partitions = &[batches]; + + let dir = tempfile::tempdir().unwrap(); + let data_file = dir.path().join("data.out"); + let index_file = dir.path().join("index.out"); + + let exec = ShuffleWriterExec::try_new( + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(partitions, Arc::clone(&schema), None).unwrap(), + ))), + CometPartitioning::RoundRobin(num_partitions, 0), + CompressionCodec::Zstd(1), + data_file.to_str().unwrap().to_string(), + index_file.to_str().unwrap().to_string(), + false, + 1024 * 1024, + ) + .unwrap(); + + let config = SessionConfig::new(); + let runtime_env = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + + // Data file should be empty (no rows to write) + let mut data = Vec::new(); + fs::File::open(&data_file) + .unwrap() + .read_to_end(&mut data) + .unwrap(); + assert!(data.is_empty(), "Data file should be empty with zero rows"); + + // Index file should have all-zero offsets + let mut index_data = Vec::new(); + fs::File::open(&index_file) + .unwrap() + .read_to_end(&mut index_data) + .unwrap(); + let expected_index_size = (num_partitions + 1) * 8; + assert_eq!(index_data.len(), expected_index_size); + for i in 0..=num_partitions { + let offset = i64::from_le_bytes(index_data[i * 8..(i + 1) * 8].try_into().unwrap()); + assert_eq!(offset, 0, "All offsets should be 0 with zero rows"); + } + } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index ee29710ee2..751bf4c1f5 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -468,4 +468,35 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } } + + // Regression test for https://github.com/apache/datafusion-comet/issues/3846 + test("repartition count") { + withTempPath { dir => + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + spark + .range(1000) + .selectExpr("id", "concat('name_', id) as name") + .repartition(100) + .write + .parquet(dir.toString) + } + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") { + val testDF = spark.read.parquet(dir.toString).repartition(10) + // Verify CometShuffleExchangeExec is in the plan + assert( + find(testDF.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.isDefined, + "Expected CometShuffleExchangeExec in the plan") + // Actual validation, no crash + val count = testDF.count() + assert(count == 1000) + // Ensure test df evaluated by Comet + checkSparkAnswerAndOperator(testDF) + } + } + } }