Skip to content

[feature](be) Add adaptive batch size for pipeline operators#62975

Open
mrhhsg wants to merge 4 commits intoapache:masterfrom
mrhhsg:abs_operators
Open

[feature](be) Add adaptive batch size for pipeline operators#62975
mrhhsg wants to merge 4 commits intoapache:masterfrom
mrhhsg:abs_operators

Conversation

@mrhhsg
Copy link
Copy Markdown
Member

@mrhhsg mrhhsg commented Apr 30, 2026

What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Extend adaptive batch size from the scan path to the remaining pipeline operators, including join, aggregation, exchange, union, table function, and sort outputs.

Release note

None

Check List (For Author)

  • Test: Unit Test
    • Unit Test: ./run-be-ut.sh --run --filter='AggOperatorTestWithOutGroupBy.:AggOperatorTestWithGroupBy.:DistinctStreamingAggOperatorTest.:ExchangeSourceOperatorXTest.:HashJoinProbeOperatorTest.:IntersectOperatorTest.:ExceptOperatorTest.:StreamingAggOperatorTest.:TableFunctionOperatorTest.:UnnestTest.:UnionOperatorTest.:FullSorterTest.:PartitionSorterTest.:SortMergerTest.:MergeSorterStateTest.*'
  • Behavior changed: Yes (adaptive batch sizing now applies to more pipeline operators)
  • Does this need documentation: No

mrhhsg added 2 commits April 30, 2026 15:07
Issue Number: None

Related PR: None

Problem Summary: Remove unused reader context and generic reader fields in the current staged changes so the reader path stays aligned with the current output-column and batch-size handling.

None

- Test: No need to test (commit current staged tracked changes only)
- Behavior changed: No
- Does this need documentation: No
### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Extend adaptive batch size from the scan path to the remaining pipeline operators, including join, aggregation, exchange, union, table function, and sort outputs.

### Release note

None

### Check List (For Author)

- Test: Unit Test
    - Unit Test: ./run-be-ut.sh --run --filter='AggOperatorTestWithOutGroupBy.*:AggOperatorTestWithGroupBy.*:DistinctStreamingAggOperatorTest.*:ExchangeSourceOperatorXTest.*:HashJoinProbeOperatorTest.*:IntersectOperatorTest.*:ExceptOperatorTest.*:StreamingAggOperatorTest.*:TableFunctionOperatorTest.*:UnnestTest.*:UnionOperatorTest.*:FullSorterTest.*:PartitionSorterTest.*:SortMergerTest.*:MergeSorterStateTest.*'
- Behavior changed: Yes (adaptive batch sizing now applies to more pipeline operators)
- Does this need documentation: No
@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@yiguolei
Copy link
Copy Markdown
Contributor

yiguolei commented May 1, 2026

run buildall

mrhhsg and others added 2 commits May 5, 2026 10:47
…rators

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary: Address review feedback on the adaptive batch size feature
(commit 908ce1d):

1. ProcessHashTableProbe::_init_probe_side: skip the build-side bytes-per-row
   contribution for left-semi/anti joins (which only output probe columns) and
   skip probe-side bytes for right-semi/anti joins. Without this filtering the
   first-batch row count is under-estimated and the emitted block is smaller
   than _block_max_bytes allows.
2. VSortedRunMerger::get_next: clarify in a comment that the cursor is
   intentionally left in the priority queue on the partial-slice path; the
   shared MergeSortBlockCursor impl ensures next() updates the queue's view in
   place.
3. BlockSerializer::next_serialized_block: document that _budget (target output
   size) and _buffer_mem_limit (back-pressure cap from
   Channel::set_buffer_mem_limit) intentionally coexist.
4. NestedLoopJoinProbeLocalState::_finalize_current_phase: rename the
   misleading 'column_size' local (which actually holds the dst column row
   count) to 'current_row_count'.

### Release note

None

### Check List (For Author)

- Test: No need to test (comment/rename refactor and behavior-preserving
  estimation tweak; existing operator/sort UTs still cover the affected code
  paths)
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
### What problem does this PR solve?

Issue Number: N/A

Related PR: follow-up to commit ff7609e

Problem Summary: Add unit-test coverage for the review-driven changes to the
adaptive-batch-size paths:

- HashJoinProbeOperatorTest.LeftSemiJoinWithAdaptiveBatchSize and
  RightSemiJoinWithAdaptiveBatchSize exercise
  ProcessHashTableProbe::_init_probe_side under a tight preferred block
  size to validate that the per-output-side filtering of the
  bytes-per-row pre-estimate (build-side excluded for LEFT_SEMI;
  probe-side excluded for RIGHT_SEMI) still yields correct results.
- BlockSerializerTest covers the dual-threshold logic in
  BlockSerializer::next_serialized_block: byte budget breakout, EOS
  forcing serialization, and no-trigger leaving the block buffered.

### Release note

None

### Check List (For Author)

- Test: Unit Test
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@mrhhsg
Copy link
Copy Markdown
Member Author

mrhhsg commented May 6, 2026

run buildall

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants