chore: fix native shuffle for batches with no columns and 0 row count#3858
chore: fix native shuffle for batches with no columns and 0 row count#3858comphead wants to merge 9 commits intoapache:mainfrom
Conversation
| withSQLConf( | ||
| CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, | ||
| CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") { |
There was a problem hiding this comment.
Is the issue specific to this combination of scan and shuffle?
interleave_record_batch is used in other parts of the shuffle codebase so those may also need updating?
There was a problem hiding this comment.
It looks like native_datafusion is used here just to easily force native shuffle.
I am confused by the comment For zero-column batches (e.g. COUNT queries) when the test isn't using a count.
There was a problem hiding this comment.
I was able to reproduce the crash with both native_datafusion and native_iceberg_compat in combination with native shuffle. the sample query for repro and test case is
spark.read.parquet("hdfs://location").repartition(50).count()
perhaps test can be slightly improved, if it confuses
| val count = testDF.count() | ||
| assert(count == 1000) | ||
| // Ensure test df evaluated by Comet | ||
| checkSparkAnswerAndOperator(testDF) |
There was a problem hiding this comment.
There is no usage of count() here. Is this intentional ?
Another way could be something like:
val testDF = spark.read.parquet(dir.toString).repartition(10)
val countDF = testDF.selectExpr("count(*) as cnt")
val count = countDF.collect().head.getLong(0)
assert(count == 1000)
checkSparkAnswerAndOperator(countDF)There was a problem hiding this comment.
it is intentional, yes. Count returns just Long, I can't really inject in the middle to check native plan, so do it I check that at least everything before count is native which works for this case
|
Is there something smarter we could be doing in this scenario (i.e., Maybe that's a premature optimization, but it seems a bit silly to me if we could end up writing a bunch of empty IPC batches. |
|
Can we get a more descriptive title and PR description? "thin batches" doesn't really convey what's happening. These are batches with no columns, right? |
IMO we filter them out inside shuffle writer but before IPC, but this is valid point, perhaps we can move this check up earlier, checking this |
You could detect |
| .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; | ||
| let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data); | ||
|
|
||
| // Distribute rows evenly: each partition gets total/N, first (total%N) get one extra |
There was a problem hiding this comment.
How does Spark handle this?
|
|
||
| for (i, offset) in offsets[..num_output_partitions].iter_mut().enumerate() { | ||
| *offset = output_data.stream_position()?; | ||
| let row_count = base + if i < remainder { 1 } else { 0 }; |
There was a problem hiding this comment.
This seems like a complicated way to handle the remainder? Why not just toss the remainder in partition 0 and be done with it? This makes the logic harder to follow.
| for (i, offset) in offsets[..num_output_partitions].iter_mut().enumerate() { | ||
| *offset = output_data.stream_position()?; | ||
| let row_count = base + if i < remainder { 1 } else { 0 }; | ||
| if row_count > 0 { |
There was a problem hiding this comment.
The correct behavior is to not emit a batch at all if row_count is 0? Just confirming. We don't need a sentinel batch with row_count of 0?
| let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data); | ||
|
|
||
| // Distribute rows evenly: each partition gets total/N, first (total%N) get one extra | ||
| let base = total_rows / num_output_partitions; |
There was a problem hiding this comment.
I'm still a bit confused why we partition at all in this case? Why not send all num_rows count to partition 0 in one batch, and leave the others empty? You'd effectively be doing the final aggregation/partition coalescing at this step, so I have no idea if that's valid for all aggregations that could yield this shuffle scenario, but it seems we're doing extra work here just to decode O(partitions) IPC batches with num_rows on the other side of the shuffle and final aggregation.
That's what I was originally thinking when we could catch this early and just write a single value.
There was a problem hiding this comment.
I see now - we need to address this extreme use case with single partitioning cause the data transmitted is too small and no reason to spin entire shuffle pipeline for it. Apparently such batches can be only in extreme agg cases like this and shouldn't affect other queries. Lets see if it works
I also think this got lost. |
| self.metrics.baseline.record_output(num_rows); | ||
| // All rows go to partition 0: partition_starts = [0, num_rows, num_rows, ...] | ||
| // partition_row_indices = [0, 1, 2, ..., num_rows-1] | ||
| let mut scratch = std::mem::take(&mut self.scratch); |
There was a problem hiding this comment.
This still looks way more complicated than what I would expect. Why do we need scratch space and to write num_rows partition_row_indices. Why are we "partitioning" rows that don't exist?
There was a problem hiding this comment.
Just trying CI if single partition approach doesn't break anything
There was a problem hiding this comment.
Its fine, shortened the PR, so shuffle steps for count batches
- partitioning_batch sees num_columns() == 0, buffers the batch, pushes all row indices into partition_indices[0] — skips hashing
- The IPC stream encodes the schema (no fields) and a single record batch message carrying just the row count in the metadata
Which issue does this PR close?
Closes #3846.
Rationale for this change
What changes are included in this PR?
How are these changes tested?