Skip to content

Intermediate result blocked approach to aggregation memory management#15591

Draft
Rachelint wants to merge 72 commits intoapache:mainfrom
Rachelint:intermeidate-result-blocked-approach
Draft

Intermediate result blocked approach to aggregation memory management#15591
Rachelint wants to merge 72 commits intoapache:mainfrom
Rachelint:intermeidate-result-blocked-approach

Conversation

@Rachelint
Copy link
Copy Markdown
Contributor

@Rachelint Rachelint commented Apr 5, 2025

Which issue does this PR close?

Rationale for this change

As mentioned in #7065 , we use a single Vec to manage aggregation intermediate results both in GroupAccumulator and GroupValues.

It is simple but not efficient enough in high-cardinality aggregation, because when Vec is not large enough, we need to allocate a new Vec and copy all data from the old one.

  • Copying a large amount of data(due to high-cardinality) is obviously expansive
  • And it is also not friendly to cpu (will refresh cache and tlb)

So this pr introduces a blocked approach to manage the aggregation intermediate results. We will never resize the Vec in the approach, and instead we split the data to blocks, when the capacity is not enough, we just allocate a new block. Detail can see #7065

What changes are included in this PR?

  • Implement the sketch for blocked approach
  • Implement blocked groups supporting PrimitiveGroupsAccumulator and GroupValuesPrimitive as the example

Are these changes tested?

Test by exist tests. And new unit tests, new fuzzy tests.

Are there any user-facing changes?

Two functions are added to GroupValues and GroupAccumulator trait.

But as you can see, there are default implementations for them, and users can choose to really support the blocked approach when wanting a better performance for their udafs.

    /// Returns `true` if this accumulator supports blocked groups.
    fn supports_blocked_groups(&self) -> bool {
        false
    }

    /// Alter the block size in the accumulator
    ///
    /// If the target block size is `None`, it will use a single big
    /// block(can think it a `Vec`) to manage the state.
    ///
    /// If the target block size` is `Some(blk_size)`, it will try to
    /// set the block size to `blk_size`, and the try will only success
    /// when the accumulator has supported blocked mode.
    ///
    /// NOTICE: After altering block size, all data in previous will be cleared.
    ///
    fn alter_block_size(&mut self, block_size: Option<usize>) -> Result<()> {
        if block_size.is_some() {
            return Err(DataFusionError::NotImplemented(
                "this accumulator doesn't support blocked mode yet".to_string(),
            ));
        }

        Ok(())
    }

@Rachelint Rachelint changed the title Impl Intermeidate result blocked approach framework Impl intermeidate result blocked approach framework Apr 5, 2025
@Rachelint Rachelint changed the title Impl intermeidate result blocked approach framework Impl intermeidate result blocked approach sketch Apr 5, 2025
@github-actions github-actions bot added the logical-expr Logical plan and expressions label Apr 5, 2025
@Dandandan
Copy link
Copy Markdown
Contributor

Hi @Rachelint I think I have a alternative proposal that seems relatively easy to implement.
I'll share it with you once I have some time to validate the design (probably this evening).

@Rachelint
Copy link
Copy Markdown
Contributor Author

Rachelint commented Apr 8, 2025

Hi @Rachelint I think I have a alternative proposal that seems relatively easy to implement. I'll share it with you once I have some time to validate the design (probably this evening).

Really thanks. This design in pr indeed still introduces quite a few code changes...

I tried to not modify anythings about GroupAccumulator firstly:

  • Only implement the blocked logic in GroupValues
  • Then we reorder the input batch according to their block indices got from GroupValues
  • Apply input batch to related GroupAccumulator using slice
  • And when we found the new block is needed, create a new GroupAccumulator (one block one GroupAccumulator)

But I found this way will introduce too many extra cost...

Maybe we place the block indices into values in merge/update_batch as a Array?

@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch 2 times, most recently from cc37eba to f690940 Compare April 9, 2025 14:37
@github-actions github-actions bot added the functions Changes to functions implementation label Apr 10, 2025
@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch from 95c6a36 to a4c6f42 Compare April 10, 2025 11:10
@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label Apr 10, 2025
@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch 6 times, most recently from 2100a5b to 0ee951c Compare April 17, 2025 11:56
@Rachelint
Copy link
Copy Markdown
Contributor Author

Rachelint commented Apr 17, 2025

Has finished development(and test) of all needed common structs!
Rest four things for this one:

  • Support blocked related logic in GroupedHashAggregateStream(we can copy it from Sketch for aggregation intermediate results blocked management #11943 )
  • Logic about deciding when we should enable this optimization
  • Example blocked version for GroupAccumulator and GroupValues
  • Unit test for blocked GroupValuesPrimitive, it is a bit complex
  • Fuzzy tests
  • Chore: fix docs, fix clippy, add more comments...

@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch 2 times, most recently from c51d409 to 2863809 Compare April 20, 2025 14:46
@github-actions github-actions bot added execution Related to the execution crate common Related to common crate sqllogictest SQL Logic Tests (.slt) labels Apr 21, 2025
@Rachelint
Copy link
Copy Markdown
Contributor Author

It is very close, just need to add more tests!

@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch 3 times, most recently from 31d660d to 2b8dd1e Compare April 22, 2025 18:52
@ahmed-mez
Copy link
Copy Markdown
Contributor

Hi @Rachelint 👋 just wanted to check in on this! The last commit was about a month ago, any update on where things stand? Also worth noting that this work could help fix or mitigate #19906, so there's renewed interest in getting it over the line.

Thanks for all the effort you've put into this. It's really appreciated!

@Rachelint
Copy link
Copy Markdown
Contributor Author

Rachelint commented Mar 28, 2026

Hi @Rachelint 👋 just wanted to check in on this! The last commit was about a month ago, any update on where things stand? Also worth noting that this work could help fix or mitigate #19906, so there's renewed interest in getting it over the line.

Thanks for all the effort you've put into this. It's really appreciated!

Sorry for long delay for some private reasons, will try to make it ready this weekend:

  • already fixed bugs in accumulate
  • I am porting this pr to the new spilling logic in row_hash.rs

kamillecao and others added 5 commits April 6, 2026 14:17
- Generate dense group indices (0..N) via hash-table-style dedup in
  fuzz tests, matching real GroupedHashAggregateStream behavior.
  Previously sparse random indices caused SeenValues::All to mark
  never-appeared groups as "seen" on transition to Some mode.
- Use build_single_null_buffer() instead of build(EmitTo::All) to
  avoid panic in blocked mode.
- Replace deprecated gen_range with random_range.
- Extract Fixture::new_fixed() from inline test setup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Refactor `maybe_enable_blocked_groups` into a pure predicate
`can_enable_blocked_groups` that returns bool, moving `alter_block_size`
calls to the caller. Adjust OOM mode selection to account for infinite
memory pools explicitly, and add assertions before entering
ProducingBlocks state. Minor rustfmt fixes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add SessionContextOptions (skip_partial / sort_hint / enable_blocked_groups)
with Option<bool> semantics: None → randomized, Some(true) → force on,
Some(false) → force off. Thread it through AggregationFuzzerBuilder →
AggregationFuzzer → SessionContextGenerator.

Also simplify can_enable_blocked_groups: flatten match to && and take
&Box<dyn GroupValues> to match call-site ergonomics.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Rachelint
Copy link
Copy Markdown
Contributor Author

have fixed all correctness problems, can be ready again today.

… path

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@alchemist51
Copy link
Copy Markdown
Contributor

alchemist51 commented Apr 6, 2026

This branch still have conflicts.

@Rachelint
Copy link
Copy Markdown
Contributor Author

This branch still have conflicts.

Yes, mainly solved the correctness in row_hash.rs and accumulate.rs.
Will clean up codes and solve other small problems today.

@Dandandan Dandandan mentioned this pull request Apr 14, 2026
@Rachelint
Copy link
Copy Markdown
Contributor Author

Busy in work recent days... Fixing the last conflicts now.

@Dandandan
Copy link
Copy Markdown
Contributor

I was checking it out as well, and playing around with it

///
#[derive(Debug)]
pub struct Blocks<B: Block> {
inner: VecDeque<B>,
Copy link
Copy Markdown
Contributor

@Dandandan Dandandan Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to avoid the VecDeque as I believe it is relatively slow to index (because of the %).

I think we can use a start offset instead during pop (and increment it), replace the block with an empty one to "pop" it and reclaim the memory.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

@Rachelint Rachelint Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is better to use Vec<T> and I tried it when I still see this a performance improvement feature.
However, after many tries, I found it actually can't help dafafusion run faster (it is only something can help to better memory management)... And I finally switch to use VecDeque for simplicity...

The experiments can be saw in this archived branch:
https://github.com/Rachelint/arrow-datafusion/compare/intermeidate-result-blocked-approach-bak

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but I think the Vec approach is relatively simple as well?
Not to pin you down, but I think when it will be used more it is problably coming up later anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, I am switching it to Vec.

- Add missing `?` operator on `take_orderings` call in first_last.rs
- Handle `EmitTo::NextBlock` in exhaustive matches for array_agg,
  first_last/state, and order modules
- Remove duplicate `Result` import in row.rs
- Remove unused imports (correlation.rs, multi_group_by/mod.rs)
- Add `#[cfg(test)]` to `TestSeenValuesResult` enum
|block_id, block_offset, new_value| {
// SAFETY: `block_id` and `block_offset` are guaranteed to be in bounds
let value = unsafe {
self.values[block_id as usize]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can use unsafe index as well (with a plain Vec it would certainly be faster)

Rachelint and others added 8 commits April 15, 2026 00:45
The merge from main accidentally set the expected value of
collect_statistics to false in the SHOW ALL assertion. Restore
it to true to match the actual config default.

Also remove stale #[expect(dead_code)] on query_builder helpers
that are now used.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Blocked groups pre-allocates memory per block, increasing baseline
memory usage. Adjust spill test memory pools to accommodate:
- test_order_is_retained_when_spilling: 600 → 2000 bytes
- test_sort_reservation_fails_during_spill: keep at 500 (still
  triggers sort reservation failure with blocked groups)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rather than adjusting memory limits in existing tests, split each into
two variants that explicitly set enable_aggregation_blocked_groups:

- test_order_is_retained_when_spilling_{flat,blocked}: both use 2000
  bytes (the original 600 is no longer sufficient after upstream
  accumulator memory changes).
- test_sort_reservation_fails_during_spill_{flat,blocked}: both use
  500 bytes which still triggers the expected sort reservation OOM.

Also add enable_blocked_groups parameter to new_spill_ctx helper so
each test explicitly controls the grouping mode.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
In flat mode (block_size=None), `new_block` was using
`Vec::with_capacity(DEFAULT_BLOCK_CAP=128)` regardless, causing the
reported `size()` to jump from ~32 bytes to 1024 bytes for only 3
groups. This made sort_headroom reservation exceed tight memory pools.

Fix: in flat mode, use `Vec::new()` and let `resize` grow via the
standard Vec growth strategy, matching the original behavior.

Also restore flat test memory limit to original 600 bytes (now passes
again), and keep blocked test at 600 bytes (batch_size=1 means
per-block capacity is just 1 slot).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add #[expect(clippy::borrowed_box)] to can_enable_blocked_groups
- Collapse nested if statements in switch_to_skip_aggregation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix formatting in context_generator.rs, fuzzer.rs, array_agg.rs,
  multi_group_by/mod.rs
- Remove stale #[expect(dead_code)] on with_no_grouping (now used)
- Update configs.md with new enable_aggregation_blocked_groups entry

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Function was renamed to can_enable_blocked_groups but the
rustdoc link in GroupedHashAggregateStream doc comment was
not updated, causing `cargo doc -D warnings` to fail.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Rachelint
Copy link
Copy Markdown
Contributor Author

ci passed and no conflict again, rest things before ready:

  • improve tests
  • improve Blocks

Rachelint and others added 2 commits April 18, 2026 23:55
- Add section comments to categorize tests: helpers, basic correctness,
  OOM/cancellation, ordered aggregation, schema/planning, skip
  aggregation, spill/memory, statistics, and multi-stage
- Add `task_ctx_with_blocked_groups` helper for non-spill tests
- Add `enable_blocked_groups` param to `check_aggregates`,
  `check_grouping_sets`, `first_last_multi_partitions`, and
  `run_test_with_spill_pool_if_necessary`
- Wrap all grouped aggregation tests in `for enable_blocked in
  [false, true]` loops so both flat and blocked storage modes are
  exercised
- Merge `test_order_is_retained_when_spilling_{flat,blocked}` and
  `test_sort_reservation_fails_during_spill_{flat,blocked}` back into
  single tests with loops

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation ffi Changes to the ffi crate functions Changes to functions implementation logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants