fix: add EmptySchemaShufflePartitioner and test from #3858#3893
fix: add EmptySchemaShufflePartitioner and test from #3858#3893mbutrovich wants to merge 5 commits intoapache:mainfrom
Conversation
|
Based on grepping logs when I still has it at INFO level, these Spark SQL tests cover this codepath in addition to the unit test we added to CometNativeShuffleSuite:
|
| /// This handles shuffles for operations like COUNT(*) that produce empty-schema record batches | ||
| /// but contain a valid row count. Accumulates the total row count and writes a single | ||
| /// zero-column IPC batch to partition 0. All other partitions get empty entries in the index file. | ||
| pub(crate) struct EmptySchemaShufflePartitioner { |
There was a problem hiding this comment.
would be useful to attach a data flow graph or something, so can figure how data transforms across shuffle phases?
There was a problem hiding this comment.
I'm not sure what you have in mind for this one because this partitioner targets a very narrow type of queries. I think there are other resources to read about general Spark shuffle behavior.
| #[async_trait::async_trait] | ||
| impl ShufflePartitioner for EmptySchemaShufflePartitioner { | ||
| async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { | ||
| let start_time = Instant::now(); |
There was a problem hiding this comment.
I'm starting to think if we need to wrap timings into macros and make them optional 🤔
There was a problem hiding this comment.
Timers have cost, but in the grand scheme of Spark jobs that last hours or days, they're not the highest priority to optimize.
| .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; | ||
| let mut index_writer = BufWriter::new(index_file); | ||
| index_writer.write_all(&0i64.to_le_bytes())?; | ||
| for _ in 0..self.num_output_partitions { |
There was a problem hiding this comment.
self.num_output_partitions ? Am I right it should be just 1 parittion?
There was a problem hiding this comment.
The shuffle writer must write index entries for all target partitions, even if we're accumulating everything into a single batch in the first partition.
Which issue does this PR close?
Closes #3846.
Rationale for this change
Native shuffle above a native scan that does not project any columns (e.g.,
COUNT(*)) results inRecordBatches with an empty schema but valid number of rows. Native shuffle currently panics trying tointerleavethose batches, but we can fast path this scenario with a special partitioner. It is similar to theSinglePartitionShufflePartitionerbut instead of concatenating batches to write to a shuffle file for a single partition, it accumulates the number of rows, then writes a single IPC batch for the number of rows, but makes sure the index file has the expected number of partitions.What changes are included in this PR?
native/shuffle/src/partitioners/empty_schema.rs: newEmptySchemaShufflePartitionerthat accumulates row count, writes a single zero-column IPC batch to partition 0, and fills the index with equal offsets for all other partitionsnative/shuffle/src/partitioners/mod.rs: exports the new partitionernative/shuffle/src/shuffle_writer.rs: branches onschema.fields().is_empty()before falling through toMultiPartitionShuffleRepartitioner; added Rust test verifying row count roundtrip and index structurespark/.../CometNativeShuffleSuite.scala: integration test from PR chore: fix native shuffle for batches with no columns and 0 row count #3858 forrepartition(10).count()with native DataFusion scanHow are these changes tested?
New test from #3858 that reflects repro in #3846.