Skip to content
Merged
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,7 @@ impl GroupedHashAggregateStream {
// instead.
// Spilling to disk and reading back also ensures batch size is consistent
// rather than potentially having one significantly larger last batch.
self.spill()?;
self.spill()?; // TODO: use sort_batch_chunked instead?

// Mark that we're switching to stream merging mode.
self.spill_state.is_stream_merging = true;
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-plan/src/sorts/multi_level_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use arrow::datatypes::SchemaRef;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;

use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
use crate::sorts::sort::get_reserved_bytes_for_record_batch_size;
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
use crate::stream::RecordBatchStreamAdapter;
use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
Expand Down Expand Up @@ -360,9 +360,13 @@ impl MultiLevelMergeBuilder {
for spill in &self.sorted_spill_files {
// For memory pools that are not shared this is good, for other this is not
// and there should be some upper limit to memory reservation so we won't starve the system
match reservation.try_grow(get_reserved_byte_for_record_batch_size(
spill.max_record_batch_memory * buffer_len,
)) {
match reservation.try_grow(
get_reserved_bytes_for_record_batch_size(
spill.max_record_batch_memory,
// Size will be the same as the sliced size, bc it is a spilled batch.
spill.max_record_batch_memory,
) * buffer_len,
) {
Ok(_) => {
number_of_spills_to_read_for_current_phase += 1;
}
Expand Down
Loading