diff --git a/be/src/exec/common/hash_table/join_hash_table.h b/be/src/exec/common/hash_table/join_hash_table.h index 9b63040330bdf2..db2d3a29ef3a84 100644 --- a/be/src/exec/common/hash_table/join_hash_table.h +++ b/be/src/exec/common/hash_table/join_hash_table.h @@ -99,22 +99,25 @@ class JoinHashTable { _keep_null_key = keep_null_key; } + // batch_size_limit: when > 0, overrides max_batch_size for this call only (thread-safe). template auto find_batch(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, bool& probe_visited, uint32_t* __restrict build_idxs, const uint8_t* null_map, - bool with_other_conjuncts, bool is_mark_join, bool has_mark_join_conjunct) { + bool with_other_conjuncts, bool is_mark_join, bool has_mark_join_conjunct, + int batch_size_limit) { if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && _empty_build_side) { return _process_null_aware_left_half_join_for_empty_build_side( - probe_idx, probe_rows, probe_idxs, build_idxs); + probe_idx, probe_rows, probe_idxs, build_idxs, batch_size_limit); } if (with_other_conjuncts) { - return _find_batch_conjunct( - keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs); + return _find_batch_conjunct(keys, build_idx_map, probe_idx, + build_idx, probe_rows, probe_idxs, + build_idxs, batch_size_limit); } if (is_mark_join) { @@ -130,35 +133,39 @@ class JoinHashTable { if (is_null_aware_join || (is_left_half_join && !has_mark_join_conjunct)) { return _find_batch_conjunct(keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, - build_idxs); + build_idxs, batch_size_limit); } - return _find_batch_conjunct( - keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs); + return _find_batch_conjunct(keys, build_idx_map, probe_idx, + build_idx, probe_rows, probe_idxs, + build_idxs, batch_size_limit); } if (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN || JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { - return _find_batch_inner_outer_join(keys, build_idx_map, probe_idx, - build_idx, probe_rows, probe_idxs, - probe_visited, build_idxs); + return _find_batch_inner_outer_join( + keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, + probe_visited, build_idxs, batch_size_limit); } // ASOF JOIN: for each probe row, find one matching build row (the closest match) // The actual closest match logic is handled in ProcessHashTableProbe if (JoinOpType == TJoinOp::ASOF_LEFT_INNER_JOIN || JoinOpType == TJoinOp::ASOF_LEFT_OUTER_JOIN) { // Use conjunct path to get all matching rows, then filter in ProcessHashTableProbe - return _find_batch_conjunct( - keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs); + return _find_batch_conjunct(keys, build_idx_map, probe_idx, + build_idx, probe_rows, probe_idxs, + build_idxs, batch_size_limit); } if (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == TJoinOp::LEFT_SEMI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { if (null_map) { - return _find_batch_left_semi_anti( - keys, build_idx_map, probe_idx, probe_rows, probe_idxs, null_map); + return _find_batch_left_semi_anti(keys, build_idx_map, probe_idx, + probe_rows, probe_idxs, + null_map, batch_size_limit); } else { - return _find_batch_left_semi_anti( - keys, build_idx_map, probe_idx, probe_rows, probe_idxs, nullptr); + return _find_batch_left_semi_anti(keys, build_idx_map, probe_idx, + probe_rows, probe_idxs, + nullptr, batch_size_limit); } } if (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { @@ -178,27 +185,27 @@ class JoinHashTable { * select 'a' not in ('b', null) => null => 'a' != 'b' and 'a' != null => true and null => null * select 'a' not in ('a', 'b', null) => false */ - auto find_null_aware_with_other_conjuncts(const Key* __restrict keys, - const uint32_t* __restrict build_idx_map, - int probe_idx, uint32_t build_idx, int probe_rows, - uint32_t* __restrict probe_idxs, - uint32_t* __restrict build_idxs, - uint8_t* __restrict null_flags, - bool picking_null_keys, const uint8_t* null_map) { + auto find_null_aware_with_other_conjuncts( + const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, + uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, + uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags, bool picking_null_keys, + const uint8_t* null_map, int batch_size_limit) { if (null_map) { return _find_null_aware_with_other_conjuncts_impl( keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs, - null_flags, picking_null_keys, null_map); + null_flags, picking_null_keys, null_map, batch_size_limit); } else { return _find_null_aware_with_other_conjuncts_impl( keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs, - null_flags, picking_null_keys, nullptr); + null_flags, picking_null_keys, nullptr, batch_size_limit); } } + // batch_size_limit: when > 0, overrides max_batch_size for this call only. template - bool iterate_map(ColumnOffset32& build_idxs, ColumnFilterHelper* mark_column_helper) const { - const auto batch_size = max_batch_size; + bool iterate_map(ColumnOffset32& build_idxs, ColumnFilterHelper* mark_column_helper, + int batch_size_limit = 0) const { + const auto batch_size = batch_size_limit > 0 ? batch_size_limit : max_batch_size; const auto elem_num = visited.size(); int count = 0; build_idxs.resize(batch_size); @@ -244,7 +251,8 @@ class JoinHashTable { template auto _process_null_aware_left_half_join_for_empty_build_side(int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs, - uint32_t* __restrict build_idxs) { + uint32_t* __restrict build_idxs, + int effective_batch_size) { if (JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) { throw Exception(ErrorCode::INTERNAL_ERROR, @@ -252,7 +260,7 @@ class JoinHashTable { "hash join input"); } uint32_t matched_cnt = 0; - const auto batch_size = max_batch_size; + const auto batch_size = effective_batch_size; while (probe_idx < probe_rows && matched_cnt < batch_size) { probe_idxs[matched_cnt] = probe_idx++; @@ -293,9 +301,9 @@ class JoinHashTable { auto _find_batch_left_semi_anti(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs, - const uint8_t* null_map) { + const uint8_t* null_map, int effective_batch_size) { uint32_t matched_cnt = 0; - const auto batch_size = max_batch_size; + const auto batch_size = effective_batch_size; while (probe_idx < probe_rows && matched_cnt < batch_size) { if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && has_null_map) { @@ -320,9 +328,10 @@ class JoinHashTable { template auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, uint32_t build_idx, int probe_rows, - uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) { + uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs, + int effective_batch_size) { uint32_t matched_cnt = 0; - const auto batch_size = max_batch_size; + const auto batch_size = effective_batch_size; auto do_the_probe = [&]() { while (build_idx && matched_cnt < batch_size) { @@ -375,9 +384,9 @@ class JoinHashTable { const uint32_t* __restrict build_idx_map, int probe_idx, uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, bool& probe_visited, - uint32_t* __restrict build_idxs) { + uint32_t* __restrict build_idxs, int effective_batch_size) { uint32_t matched_cnt = 0; - const auto batch_size = max_batch_size; + const auto batch_size = effective_batch_size; auto do_the_probe = [&]() { while (build_idx && matched_cnt < batch_size) { @@ -429,9 +438,9 @@ class JoinHashTable { const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags, bool picking_null_keys, - const uint8_t* null_map) { + const uint8_t* null_map, int effective_batch_size) { uint32_t matched_cnt = 0; - const auto batch_size = max_batch_size; + const auto batch_size = effective_batch_size; auto do_the_probe = [&]() { /// If no any rows match the probe key, here start to handle null keys in build side. diff --git a/be/src/exec/exchange/local_exchange_source_operator.cpp b/be/src/exec/exchange/local_exchange_source_operator.cpp index ad092656f21793..4f265ab44f598c 100644 --- a/be/src/exec/exchange/local_exchange_source_operator.cpp +++ b/be/src/exec/exchange/local_exchange_source_operator.cpp @@ -21,6 +21,10 @@ namespace doris { +LocalExchangeSourceLocalState::LocalExchangeSourceLocalState(RuntimeState* state, + OperatorXBase* parent) + : Base(state, parent) {} + Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); diff --git a/be/src/exec/exchange/local_exchange_source_operator.h b/be/src/exec/exchange/local_exchange_source_operator.h index 58252b24ec2c23..4955c02d939e17 100644 --- a/be/src/exec/exchange/local_exchange_source_operator.h +++ b/be/src/exec/exchange/local_exchange_source_operator.h @@ -31,8 +31,7 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState; ENABLE_FACTORY_CREATOR(LocalExchangeSourceLocalState); - LocalExchangeSourceLocalState(RuntimeState* state, OperatorXBase* parent) - : Base(state, parent) {} + LocalExchangeSourceLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; Status close(RuntimeState* state) override; diff --git a/be/src/exec/exchange/local_exchanger.cpp b/be/src/exec/exchange/local_exchanger.cpp index 620aae737050d6..2eec3c70d61815 100644 --- a/be/src/exec/exchange/local_exchanger.cpp +++ b/be/src/exec/exchange/local_exchanger.cpp @@ -22,7 +22,6 @@ #include "exec/exchange/local_exchange_sink_operator.h" #include "exec/exchange/local_exchange_source_operator.h" #include "exec/partitioner/partitioner.h" - namespace doris { template void Exchanger::_enqueue_data_and_set_ready(int channel_id, @@ -155,7 +154,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, Block* block, bool* eos, auto block_wrapper = partitioned_block.first; RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, offset_start, offset_start + partitioned_block.second.length)); - } while (mutable_block.rows() < state->batch_size() && !*eos && + if (source_info.local_state->block_budget().exceeded(mutable_block.rows(), + mutable_block.bytes())) { + break; + } + } while (mutable_block.rows() < source_info.local_state->block_budget().max_rows && !*eos && _dequeue_data(source_info.local_state, partitioned_block, eos, block, source_info.channel_id)); return Status::OK(); diff --git a/be/src/exec/exchange/vdata_stream_recvr.cpp b/be/src/exec/exchange/vdata_stream_recvr.cpp index 50dcff5bbd9cee..394a3b17aaf63e 100644 --- a/be/src/exec/exchange/vdata_stream_recvr.cpp +++ b/be/src/exec/exchange/vdata_stream_recvr.cpp @@ -426,13 +426,13 @@ VDataStreamRecvr::~VDataStreamRecvr() { Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr, const std::vector& is_asc_order, const std::vector& nulls_first, size_t batch_size, - int64_t limit, size_t offset) { + int64_t limit, size_t offset, size_t block_max_bytes) { DCHECK(_is_merging); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); std::vector child_block_suppliers; // Create the merger that will a single stream of sorted rows. _merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit, - offset, _profile)); + offset, _profile, block_max_bytes)); for (int i = 0; i < _sender_queues.size(); ++i) { child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch), diff --git a/be/src/exec/exchange/vdata_stream_recvr.h b/be/src/exec/exchange/vdata_stream_recvr.h index 9a5d0d3575945b..e0ef1d8d8b7367 100644 --- a/be/src/exec/exchange/vdata_stream_recvr.h +++ b/be/src/exec/exchange/vdata_stream_recvr.h @@ -78,7 +78,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { MOCK_FUNCTION Status create_merger(const VExprContextSPtrs& ordering_expr, const std::vector& is_asc_order, const std::vector& nulls_first, size_t batch_size, - int64_t limit, size_t offset); + int64_t limit, size_t offset, size_t block_max_bytes); std::vector sender_queues() const { return _sender_queues; } diff --git a/be/src/exec/exchange/vdata_stream_sender.cpp b/be/src/exec/exchange/vdata_stream_sender.cpp index 04e68aeb136d13..522953d618283e 100644 --- a/be/src/exec/exchange/vdata_stream_sender.cpp +++ b/be/src/exec/exchange/vdata_stream_sender.cpp @@ -288,7 +288,9 @@ Status Channel::close(RuntimeState* state) { } BlockSerializer::BlockSerializer(ExchangeSinkLocalState* parent, bool is_local) - : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} + : _parent(parent), + _is_local(is_local), + _budget(parent->state()->batch_size(), parent->state()->preferred_block_size_bytes()) {} Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers, bool* serialized, bool eos, const uint32_t* data, @@ -309,7 +311,13 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t } } - if (_mutable_block->rows() >= _batch_size || eos || + // Two thresholds intentionally coexist: + // - _budget caps the *target* output block size (rows + bytes), shaping the + // average serialized batch sent downstream. + // - _buffer_mem_limit is the dynamic back-pressure cap propagated from + // Channel::set_buffer_mem_limit(); when the in-flight buffer grows past it + // we must flush regardless of the budget, to keep memory bounded. + if (_budget.exceeded(_mutable_block->rows(), _mutable_block->bytes()) || eos || (_mutable_block->rows() > 0 && _mutable_block->allocated_bytes() > _buffer_mem_limit)) { if (!_is_local) { RETURN_IF_ERROR(_serialize_block(dest, num_receivers)); diff --git a/be/src/exec/exchange/vdata_stream_sender.h b/be/src/exec/exchange/vdata_stream_sender.h index 0488cde3811650..7fc8d85d559df5 100644 --- a/be/src/exec/exchange/vdata_stream_sender.h +++ b/be/src/exec/exchange/vdata_stream_sender.h @@ -49,6 +49,7 @@ #include "runtime/runtime_profile.h" #include "service/backend_options.h" #include "storage/tablet_info.h" +#include "util/block_budget.h" #include "util/brpc_closure.h" #include "util/uid_util.h" @@ -72,7 +73,7 @@ class BlockSerializer { public: BlockSerializer(ExchangeSinkLocalState* parent, bool is_local = true); #ifdef BE_TEST - BlockSerializer() : _batch_size(0) {}; + BlockSerializer() : _parent(nullptr), _is_local(true), _budget(0, 0) {}; #endif Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized, bool eos, const uint32_t* data = nullptr, @@ -97,7 +98,7 @@ class BlockSerializer { std::unique_ptr _mutable_block; bool _is_local; - const int _batch_size; + const BlockBudget _budget; std::atomic _buffer_mem_limit = UINT64_MAX; }; diff --git a/be/src/exec/operator/aggregation_source_operator.cpp b/be/src/exec/operator/aggregation_source_operator.cpp index d5385efdd06fe0..4857cd6e567366 100644 --- a/be/src/exec/operator/aggregation_source_operator.cpp +++ b/be/src/exec/operator/aggregation_source_operator.cpp @@ -27,7 +27,6 @@ #include "exprs/vexpr_fwd.h" #include "runtime/runtime_profile.h" #include "runtime/thread_context.h" - namespace doris { AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {} @@ -120,6 +119,9 @@ Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state, Bloc } } + // Compute effective max rows based on estimated bytes per row. + const size_t effective_max_rows = _budget.effective_max_rows(_estimated_row_bytes); + std::visit( Overload { [&](std::monostate& arg) -> void { @@ -128,7 +130,7 @@ Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state, Bloc [&](auto& agg_method) -> void { agg_method.init_iterator(); auto& data = *agg_method.hash_table; - const auto size = std::min(data.size(), size_t(state->batch_size())); + const auto size = std::min(data.size(), effective_max_rows); using KeyType = std::decay_t::Key; std::vector keys(size); @@ -154,7 +156,7 @@ Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state, Bloc { SCOPED_TIMER(_hash_table_iterate_timer); auto& it = agg_method.begin; - while (it != agg_method.end && num_rows < state->batch_size()) { + while (it != agg_method.end && num_rows < effective_max_rows) { keys[num_rows] = it.get_first(); auto inline_count = reinterpret_cast(it.get_second()); @@ -176,7 +178,7 @@ Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state, Bloc if (agg_method.hash_table->has_null_key_data()) { DCHECK(key_columns.size() == 1); DCHECK(key_columns[0]->is_nullable()); - if (num_rows < state->batch_size()) { + if (num_rows < effective_max_rows) { key_columns[0]->insert_data(nullptr, 0); auto mapped = agg_method.hash_table->template get_null_key_data< @@ -205,7 +207,7 @@ Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state, Bloc { SCOPED_TIMER(_hash_table_iterate_timer); while (iter != shared_state.aggregate_data_container->end() && - num_rows < state->batch_size()) { + num_rows < effective_max_rows) { keys[num_rows] = iter.template get_key(); shared_state.values[num_rows] = iter.get_aggregate_data(); ++iter; @@ -276,6 +278,10 @@ Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state, Bloc *block = Block(columns_with_schema); } + if (block->rows() > 0) { + _estimated_row_bytes = block->bytes() / block->rows(); + } + return Status::OK(); } @@ -307,6 +313,10 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block } SCOPED_TIMER(_get_results_timer); + + // Compute effective max rows based on estimated bytes per row. + const size_t effective_max_rows = _budget.effective_max_rows(_estimated_row_bytes); + std::visit( Overload { [&](std::monostate& arg) -> void { @@ -315,7 +325,7 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block [&](auto& agg_method) -> void { auto& data = *agg_method.hash_table; agg_method.init_iterator(); - const auto size = std::min(data.size(), size_t(state->batch_size())); + const auto size = std::min(data.size(), effective_max_rows); using KeyType = std::decay_t::Key; std::vector keys(size); @@ -328,7 +338,7 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block { SCOPED_TIMER(_hash_table_iterate_timer); auto& it = agg_method.begin; - while (it != agg_method.end && num_rows < state->batch_size()) { + while (it != agg_method.end && num_rows < effective_max_rows) { keys[num_rows] = it.get_first(); auto& mapped = it.get_second(); count_column.insert_value(static_cast( @@ -347,7 +357,7 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block if (agg_method.hash_table->has_null_key_data()) { DCHECK(key_columns.size() == 1); DCHECK(key_columns[0]->is_nullable()); - if (key_columns[0]->size() < state->batch_size()) { + if (key_columns[0]->size() < effective_max_rows) { key_columns[0]->insert_data(nullptr, 0); auto mapped = agg_method.hash_table->template get_null_key_data< @@ -375,7 +385,7 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block { SCOPED_TIMER(_hash_table_iterate_timer); while (iter != shared_state.aggregate_data_container->end() && - num_rows < state->batch_size()) { + num_rows < effective_max_rows) { keys[num_rows] = iter.template get_key(); shared_state.values[num_rows] = iter.get_aggregate_data(); ++iter; @@ -401,7 +411,7 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block // here need additional processing logic on the null key / value DCHECK(key_columns.size() == 1); DCHECK(key_columns[0]->is_nullable()); - if (key_columns[0]->size() < state->batch_size()) { + if (key_columns[0]->size() < effective_max_rows) { key_columns[0]->insert_data(nullptr, 0); auto mapped = agg_method.hash_table->template get_null_key_data< AggregateDataPtr>(); @@ -433,6 +443,10 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block block->set_columns(std::move(columns)); } + if (block->rows() > 0) { + _estimated_row_bytes = block->bytes() / block->rows(); + } + return Status::OK(); } diff --git a/be/src/exec/operator/aggregation_source_operator.h b/be/src/exec/operator/aggregation_source_operator.h index c9348826ca4f85..e32738098f0d2c 100644 --- a/be/src/exec/operator/aggregation_source_operator.h +++ b/be/src/exec/operator/aggregation_source_operator.h @@ -80,6 +80,7 @@ class AggLocalState MOCK_REMOVE(final) : public PipelineXLocalState; diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp index 298896401d6f3e..d4f19d95393ee2 100644 --- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp +++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp @@ -36,7 +36,6 @@ namespace doris { DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState(state, parent), - batch_size(state->batch_size()), _agg_data(std::make_unique()), _child_block(Block::create_unique()), _aggregated_block(Block::create_unique()), @@ -189,7 +188,8 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( SCOPED_TIMER(_insert_keys_to_column_timer); if (mem_reuse) { if (_stop_emplace_flag && !out_block->empty()) { - // when out_block row >= batch_size, push it to data_queue, so when _stop_emplace_flag = true, maybe have some data in block + // when out_block reaches the row/byte budget, push it to data_queue. so when + // _stop_emplace_flag = true, maybe have some data in block // need output those data firstly DCHECK(_distinct_row.empty()); _distinct_row.resize(rows); @@ -206,16 +206,16 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( } } else { DCHECK_EQ(_cache_block.rows(), 0); - // is output row > batch_size, split some to cache_block - if (out_block->rows() + _distinct_row.size() > batch_size) { - size_t split_size = batch_size - out_block->rows(); + const size_t max_rows_to_add = + _budget.remaining_rows(out_block->rows(), out_block->bytes()); + if (_distinct_row.size() > max_rows_to_add) { for (int i = 0; i < key_size; ++i) { auto output_dst = out_block->get_by_position(i).column->assume_mutable(); key_columns[i]->append_data_by_selector(output_dst, _distinct_row, 0, - split_size); + max_rows_to_add); auto cache_dst = _cache_block.get_by_position(i).column->assume_mutable(); - key_columns[i]->append_data_by_selector(cache_dst, _distinct_row, split_size, - _distinct_row.size()); + key_columns[i]->append_data_by_selector(cache_dst, _distinct_row, + max_rows_to_add, _distinct_row.size()); } } else { for (int i = 0; i < key_size; ++i) { @@ -393,7 +393,9 @@ bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) co auto& local_state = get_local_state(state); const bool need_batch = local_state._stop_emplace_flag ? local_state._aggregated_block->empty() - : local_state._aggregated_block->rows() < state->batch_size(); + : local_state.block_budget().within_budget( + local_state._aggregated_block->rows(), + local_state._aggregated_block->bytes()); return need_batch && !(local_state._child_eos || local_state._reach_limit); } diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.h b/be/src/exec/operator/distinct_streaming_aggregation_operator.h index abf42eb50cc977..f9a4b8c09e71df 100644 --- a/be/src/exec/operator/distinct_streaming_aggregation_operator.h +++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.h @@ -67,7 +67,6 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState _agg_data = nullptr; // group by k1,k2 VExprContextSPtrs _probe_expr_ctxs; diff --git a/be/src/exec/operator/exchange_source_operator.cpp b/be/src/exec/operator/exchange_source_operator.cpp index e008d599078d77..e71daf945c2165 100644 --- a/be/src/exec/operator/exchange_source_operator.cpp +++ b/be/src/exec/operator/exchange_source_operator.cpp @@ -158,7 +158,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block, boo SCOPED_TIMER(local_state.create_merger_timer); RETURN_IF_ERROR(local_state.stream_recvr->create_merger( local_state.ordering_expr_ctxs, _is_asc_order, _nulls_first, state->batch_size(), - _limit, _offset)); + _limit, _offset, state->preferred_block_size_bytes())); local_state.is_ready = true; return Status::OK(); } diff --git a/be/src/exec/operator/hashjoin_probe_operator.cpp b/be/src/exec/operator/hashjoin_probe_operator.cpp index 9b913cc9b23451..64fb6b584c9567 100644 --- a/be/src/exec/operator/hashjoin_probe_operator.cpp +++ b/be/src/exec/operator/hashjoin_probe_operator.cpp @@ -97,7 +97,7 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) { } else { _process_hashtable_ctx_variants ->emplace>( - this, state->batch_size()); + this, state->batch_size(), state->preferred_block_size_bytes()); } }, _shared_state->join_op_variants, make_bool_variant(p._have_other_join_conjunct)); diff --git a/be/src/exec/operator/join/process_hash_table_probe.h b/be/src/exec/operator/join/process_hash_table_probe.h index c5b86951d18854..73b56f3ac99158 100644 --- a/be/src/exec/operator/join/process_hash_table_probe.h +++ b/be/src/exec/operator/join/process_hash_table_probe.h @@ -40,7 +40,7 @@ using ConstNullMapPtr = const NullMap*; template struct ProcessHashTableProbe { - ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size); + ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size, size_t block_max_bytes); ~ProcessHashTableProbe() = default; // output build side result column @@ -94,7 +94,9 @@ struct ProcessHashTableProbe { HashJoinProbeLocalState* _parent = nullptr; HashJoinProbeOperatorX* _parent_operator = nullptr; - const int _batch_size; + int _batch_size; + const int _initial_batch_size; + const size_t _block_max_bytes; const std::shared_ptr& _build_block; std::unique_ptr _arena; diff --git a/be/src/exec/operator/join/process_hash_table_probe_impl.h b/be/src/exec/operator/join/process_hash_table_probe_impl.h index 5bfd2ff4e0cbfc..1ce388339402fc 100644 --- a/be/src/exec/operator/join/process_hash_table_probe_impl.h +++ b/be/src/exec/operator/join/process_hash_table_probe_impl.h @@ -66,10 +66,12 @@ static void mock_column_size(auto& col, size_t size) { template ProcessHashTableProbe::ProcessHashTableProbe(HashJoinProbeLocalState* parent, - int batch_size) + int batch_size, size_t block_max_bytes) : _parent(parent), _parent_operator(&parent->_parent->template cast()), _batch_size(batch_size), + _initial_batch_size(batch_size), + _block_max_bytes(block_max_bytes), _build_block(parent->build_block()), _have_other_join_conjunct(_parent_operator->_have_other_join_conjunct), _left_output_slot_flags(_parent_operator->_left_output_slot_flags), @@ -185,13 +187,13 @@ template template typename HashTableType::State ProcessHashTableProbe::_init_probe_side( HashTableType& hash_table_ctx, uint32_t probe_rows, const uint8_t* null_map) { - // may over batch size 1 for some outer join case - _probe_indexs.resize(_batch_size + 1); - _build_indexs.resize(_batch_size + 1); + // Use _initial_batch_size for buffer sizing to ensure find_batch has enough space. + _probe_indexs.resize(_initial_batch_size + 1); + _build_indexs.resize(_initial_batch_size + 1); if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && _have_other_join_conjunct) { - _null_flags.resize(_batch_size + 1); + _null_flags.resize(_initial_batch_size + 1); } if (!_parent->_ready_probe) { @@ -210,6 +212,35 @@ typename HashTableType::State ProcessHashTableProbe::_init_probe_sid int64_t arena_memory_usage = hash_table_ctx.serialized_keys_size(false); COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage); COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage); + + // Pre-estimate bytes per row to limit batch size. Only include sides that + // actually contribute columns to the output: left-semi/anti joins do not + // output build columns, right-semi/anti joins do not output probe columns. + // Without this filtering, the first batch can be substantially smaller than + // _block_max_bytes allows. Subsequent batches are further adjusted by + // process() based on the observed output bytes-per-row. + { + constexpr bool outputs_build_side = JoinOpType != TJoinOp::LEFT_SEMI_JOIN && + JoinOpType != TJoinOp::LEFT_ANTI_JOIN && + JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN && + JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + constexpr bool outputs_probe_side = JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && + JoinOpType != TJoinOp::RIGHT_ANTI_JOIN; + size_t total_bytes_per_row = 0; + if (outputs_build_side && _build_block && _build_block->rows() > 0) { + total_bytes_per_row += _build_block->bytes() / _build_block->rows(); + } + auto& probe_block = _parent->_probe_block; + if (outputs_probe_side && probe_block.rows() > 0) { + total_bytes_per_row += probe_block.bytes() / probe_block.rows(); + } + if (total_bytes_per_row > 0) { + int bytes_limited = std::max( + 1, static_cast(std::min(_block_max_bytes / total_bytes_per_row, + static_cast(INT_MAX)))); + _batch_size = std::min(_batch_size, bytes_limited); + } + } } return typename HashTableType::State(_parent->_probe_columns); @@ -514,7 +545,7 @@ Status ProcessHashTableProbe::process(HashTableType& hash_table_ctx, hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index, build_index, probe_rows, _probe_indexs.get_data().data(), _build_indexs.get_data().data(), _null_flags.data(), _picking_null_keys, - null_map); + null_map, _batch_size); probe_index = new_probe_idx; build_index = new_build_idx; current_offset = new_current_offset; @@ -539,7 +570,7 @@ Status ProcessHashTableProbe::process(HashTableType& hash_table_ctx, build_index, cast_set(probe_rows), _probe_indexs.get_data().data(), _probe_visited, _build_indexs.get_data().data(), null_map, _have_other_join_conjunct, is_mark_join, - !_parent->_mark_join_conjuncts.empty()); + !_parent->_mark_join_conjuncts.empty(), _batch_size); probe_index = new_probe_idx; build_index = new_build_idx; current_offset = new_current_offset; @@ -560,6 +591,18 @@ Status ProcessHashTableProbe::process(HashTableType& hash_table_ctx, DCHECK_EQ(current_offset, output_block->rows()); COUNTER_UPDATE(_parent->_intermediate_rows_counter, current_offset); + // Adaptively adjust batch size based on output block bytes. + if (current_offset > 0) { + size_t block_bytes = output_block->bytes(); + size_t bytes_per_row = block_bytes / current_offset; + if (bytes_per_row > 0) { + int new_batch_size = + std::max(1, static_cast(std::min(_block_max_bytes / bytes_per_row, + static_cast(INT_MAX)))); + _batch_size = std::min(_initial_batch_size, new_batch_size); + } + } + // For ASOF JOIN, skip conjuncts filtering since we already found best match if constexpr (is_asof_join) { return Status::OK(); @@ -974,11 +1017,11 @@ Status ProcessHashTableProbe< if (is_mark_join) { std::unique_ptr mark_column = std::make_unique(*mcol[mcol.size() - 1]); - *eos = hash_table_ctx.hash_table->template iterate_map(_build_indexs, - mark_column.get()); + *eos = hash_table_ctx.hash_table->template iterate_map( + _build_indexs, mark_column.get(), _batch_size); } else { - *eos = hash_table_ctx.hash_table->template iterate_map(_build_indexs, - nullptr); + *eos = hash_table_ctx.hash_table->template iterate_map( + _build_indexs, nullptr, _batch_size); } auto block_size = _build_indexs.size(); @@ -1039,7 +1082,19 @@ Status ProcessHashTableProbe< } } output_block->swap(mutable_block.to_block(0)); - DCHECK(block_size <= _batch_size); + + // Adaptively adjust batch size based on output block bytes. + if (block_size > 0) { + size_t block_bytes = output_block->bytes(); + size_t bytes_per_row = block_bytes / block_size; + if (bytes_per_row > 0) { + int new_batch_size = + std::max(1, static_cast(std::min(_block_max_bytes / bytes_per_row, + static_cast(INT_MAX)))); + _batch_size = std::min(_initial_batch_size, new_batch_size); + } + } + DCHECK(block_size <= _initial_batch_size); } return Status::OK(); } diff --git a/be/src/exec/operator/local_merge_sort_source_operator.cpp b/be/src/exec/operator/local_merge_sort_source_operator.cpp index dacd97357d391e..3777fe91c7541d 100644 --- a/be/src/exec/operator/local_merge_sort_source_operator.cpp +++ b/be/src/exec/operator/local_merge_sort_source_operator.cpp @@ -66,9 +66,9 @@ Status LocalMergeSortLocalState::build_merger(RuntimeState* state) { for (size_t i = 0; i < ordering_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, ordering_expr_ctxs[i])); } - _merger = std::make_unique(ordering_expr_ctxs, p._is_asc_order, - p._nulls_first, state->batch_size(), p._limit, - p._offset, custom_profile()); + _merger = std::make_unique( + ordering_expr_ctxs, p._is_asc_order, p._nulls_first, state->batch_size(), p._limit, + p._offset, custom_profile(), state->preferred_block_size_bytes()); std::vector child_block_suppliers; for (auto sorter : p._sorters) { BlockSupplier block_supplier = [sorter, state](Block* block, bool* eos) { diff --git a/be/src/exec/operator/nested_loop_join_probe_operator.cpp b/be/src/exec/operator/nested_loop_join_probe_operator.cpp index 7a3be55cbb1988..3587a29ee971c8 100644 --- a/be/src/exec/operator/nested_loop_join_probe_operator.cpp +++ b/be/src/exec/operator/nested_loop_join_probe_operator.cpp @@ -25,7 +25,6 @@ #include "core/column/column.h" #include "core/column/column_filter_helper.h" #include "exec/operator/operator.h" - namespace doris { class RuntimeState; } // namespace doris @@ -193,7 +192,9 @@ void NestedLoopJoinProbeLocalState::_generate_block_base_probe(RuntimeState* sta return build_blocks[_current_build_pos].rows(); }; - while (_join_block.rows() + add_rows() <= state->batch_size()) { + size_t effective_max_rows = _budget.max_rows; + bool bytes_estimated = false; + while (_join_block.rows() + add_rows() <= effective_max_rows) { while (_current_build_pos == _shared_state->build_blocks.size() || _probe_block_pos == probe_block->rows()) { // if probe block is empty(), do not need disprocess the probe block rows @@ -229,11 +230,19 @@ void NestedLoopJoinProbeLocalState::_generate_block_base_probe(RuntimeState* sta SCOPED_TIMER(_output_temp_blocks_timer); process_probe_block(_probe_block_pos, _join_block, *probe_block, p._num_probe_side_columns, now_process_build_block, p._num_build_side_columns); + + // Refine row limit based on actual bytes per row after first data-producing iteration + if (!bytes_estimated && _join_block.rows() > 0) { + bytes_estimated = true; + size_t bytes_per_row = _join_block.bytes() / _join_block.rows(); + if (bytes_per_row > 0) { + effective_max_rows = _budget.effective_max_rows(bytes_per_row); + } + } } - DCHECK_LE(_join_block.rows(), state->batch_size()) - << "join block rows:" << _join_block.rows() - << ", state batch size:" << state->batch_size() + DCHECK_LE(_join_block.rows(), _budget.max_rows) + << "join block rows:" << _join_block.rows() << ", state batch size:" << _budget.max_rows << "probe_block rows:" << probe_block->rows() << " build blocks size:" << _shared_state->build_blocks.size(); } @@ -268,7 +277,9 @@ void NestedLoopJoinProbeLocalState::_generate_block_base_build(RuntimeState* sta return; } - while (_join_block.rows() + probe_rows <= state->batch_size()) { + size_t effective_max_rows = _budget.max_rows; + bool bytes_estimated = false; + while (_join_block.rows() + probe_rows <= effective_max_rows) { // The current build row has processed the entire probe block; move to the next build row if (_probe_block_pos == probe_rows) { // Move to the next build row and reset the probe position @@ -300,11 +311,17 @@ void NestedLoopJoinProbeLocalState::_generate_block_base_build(RuntimeState* sta // Mark the current probe block as processed; the next loop will move to the next build row _probe_block_pos = probe_rows; + + // Refine row limit based on actual bytes per row after first data-producing iteration + if (!bytes_estimated && _join_block.rows() > 0) { + bytes_estimated = true; + effective_max_rows = + _budget.effective_max_rows(_join_block.bytes() / _join_block.rows()); + } } - DCHECK_LE(_join_block.rows(), state->batch_size()) - << "join block rows:" << _join_block.rows() - << ", state batch size:" << state->batch_size() + DCHECK_LE(_join_block.rows(), _budget.max_rows) + << "join block rows:" << _join_block.rows() << ", state batch size:" << _budget.max_rows << "probe_block rows:" << probe_block->rows() << "build block rows:" << build_block.rows(); } @@ -367,7 +384,7 @@ Status NestedLoopJoinProbeLocalState::generate_other_join_block_data(RuntimeStat // probe row with null from build side. if (_probe_side_process_count) { _finalize_current_phase( - _join_block, state->batch_size()); + _join_block); } } else if (_probe_side_process_count && p._is_mark_join && _shared_state->build_blocks.empty()) { @@ -387,23 +404,29 @@ Status NestedLoopJoinProbeLocalState::generate_other_join_block_data(RuntimeStat if (_matched_rows_done && _output_null_idx_build_side < _shared_state->build_blocks.size()) { _finalize_current_phase( - _join_block, state->batch_size()); + _join_block); } } return Status::OK(); } template -void NestedLoopJoinProbeLocalState::_finalize_current_phase(Block& block, size_t batch_size) { +void NestedLoopJoinProbeLocalState::_finalize_current_phase(Block& block) { auto& p = _parent->cast(); auto dst_columns = block.mutate_columns(); DCHECK_GT(dst_columns.size(), 0); - auto column_size = dst_columns[0]->size(); + auto current_row_count = dst_columns[0]->size(); + // Precompute effective batch size from estimated bytes per row to avoid per-iteration byte sum + size_t effective_batch_size = _budget.max_rows; + if (current_row_count > 0) { + const size_t current_bytes = Block::columns_byte_size(dst_columns); + effective_batch_size = _budget.effective_max_rows(current_bytes / current_row_count); + } if constexpr (BuildSide) { DCHECK(!p._is_mark_join); auto build_block_sz = _shared_state->build_blocks.size(); size_t i = _output_null_idx_build_side; - for (; i < build_block_sz && column_size < batch_size; i++) { + for (; i < build_block_sz && current_row_count < effective_batch_size; i++) { const auto& cur_block = _shared_state->build_blocks[i]; const auto* __restrict cur_visited_flags = assert_cast(_shared_state->build_side_visited_flags[i].get()) @@ -425,7 +448,7 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(Block& block, size_t } } - column_size += selector_idx; + current_row_count += selector_idx; for (size_t j = 0; j < p._num_probe_side_columns; ++j) { DCHECK(p._join_op == TJoinOp::RIGHT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN || @@ -445,7 +468,7 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(Block& block, size_t assert_cast(dst_columns[p._num_probe_side_columns + j].get()) ->get_null_map_column() .get_data() - .resize_fill(column_size, 0); + .resize_fill(current_row_count, 0); } else { dst_columns[p._num_probe_side_columns + j]->insert_indices_from( *src_column.column.get(), selector.data(), @@ -456,7 +479,7 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(Block& block, size_t _output_null_idx_build_side = i; } else { if (!p._is_mark_join) { - auto new_size = column_size; + auto new_size = current_row_count; DCHECK_LE(_probe_block_start_pos + _probe_side_process_count, _child_block->rows()); for (int j = _probe_block_start_pos; j < _probe_block_start_pos + _probe_side_process_count; ++j) { @@ -479,10 +502,10 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(Block& block, size_t } } } - if (new_size > column_size) { + if (new_size > current_row_count) { for (size_t i = 0; i < p._num_build_side_columns; ++i) { - dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(new_size - - column_size); + dst_columns[p._num_probe_side_columns + i]->insert_many_defaults( + new_size - current_row_count); } } } else { diff --git a/be/src/exec/operator/nested_loop_join_probe_operator.h b/be/src/exec/operator/nested_loop_join_probe_operator.h index aea6972f428bec..256dcfe138c134 100644 --- a/be/src/exec/operator/nested_loop_join_probe_operator.h +++ b/be/src/exec/operator/nested_loop_join_probe_operator.h @@ -81,7 +81,7 @@ class NestedLoopJoinProbeLocalState final friend class NestedLoopJoinProbeOperatorX; void _update_additional_flags(Block* block); template - void _finalize_current_phase(Block& block, size_t batch_size); + void _finalize_current_phase(Block& block); void _reset_with_next_probe_row(); void _append_probe_data_with_null(Block& block) const; template diff --git a/be/src/exec/operator/set_source_operator.cpp b/be/src/exec/operator/set_source_operator.cpp index a314f411311069..c0550570ff9a19 100644 --- a/be/src/exec/operator/set_source_operator.cpp +++ b/be/src/exec/operator/set_source_operator.cpp @@ -23,8 +23,11 @@ #include "common/status.h" #include "exec/operator/operator.h" #include "runtime/runtime_profile.h" - namespace doris { +template +SetSourceLocalState::SetSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {} + template Status SetSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); @@ -80,6 +83,10 @@ Status SetSourceOperatorX::get_block(RuntimeState* state, Block* b SCOPED_TIMER(local_state.exec_time_counter()); SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); + // Compute effective batch size based on estimated bytes per row. + const size_t effective_batch_size = + local_state.block_budget().effective_max_rows(local_state._estimated_row_bytes); + _create_mutable_cols(local_state, block); { SCOPED_TIMER(local_state._get_data_timer); @@ -87,8 +94,9 @@ Status SetSourceOperatorX::get_block(RuntimeState* state, Block* b [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - return _get_data_in_hashtable(local_state, arg, block, - state->batch_size(), eos); + return _get_data_in_hashtable( + local_state, arg, block, static_cast(effective_batch_size), + eos); } else { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); @@ -101,6 +109,9 @@ Status SetSourceOperatorX::get_block(RuntimeState* state, Block* b RETURN_IF_ERROR( VExprContext::filter_block(local_state._conjuncts, block, block->columns())); } + if (block->rows() > 0) { + local_state._estimated_row_bytes = block->bytes() / block->rows(); + } local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/exec/operator/set_source_operator.h b/be/src/exec/operator/set_source_operator.h index 40887f35fab3f1..5c3e6674fe453c 100644 --- a/be/src/exec/operator/set_source_operator.h +++ b/be/src/exec/operator/set_source_operator.h @@ -34,7 +34,7 @@ class SetSourceLocalState final : public PipelineXLocalState { ENABLE_FACTORY_CREATOR(SetSourceLocalState); using Base = PipelineXLocalState; using Parent = SetSourceOperatorX; - SetSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; + SetSourceLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& infos) override; Status open(RuntimeState* state) override; @@ -51,6 +51,7 @@ class SetSourceLocalState final : public PipelineXLocalState { RuntimeProfile::Counter* _get_data_from_hashtable_rows = nullptr; IColumn::Selector _result_indexs; bool _null_key_output = false; + size_t _estimated_row_bytes = 0; }; template diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp b/be/src/exec/operator/spill_sort_source_operator.cpp index a745bf2858d45b..5effe8d15a4b8d 100644 --- a/be/src/exec/operator/spill_sort_source_operator.cpp +++ b/be/src/exec/operator/spill_sort_source_operator.cpp @@ -155,7 +155,8 @@ Status SpillSortLocalState::_create_intermediate_merger(RuntimeState* state, int } _merger = std::make_unique(sort_description, state->batch_size(), limit, - offset, custom_profile()); + offset, custom_profile(), + state->preferred_block_size_bytes()); _current_merging_files.clear(); _current_merging_readers.clear(); diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp b/be/src/exec/operator/streaming_aggregation_operator.cpp index 5744b288a4487e..ef3526706e06e7 100644 --- a/be/src/exec/operator/streaming_aggregation_operator.cpp +++ b/be/src/exec/operator/streaming_aggregation_operator.cpp @@ -31,7 +31,6 @@ #include "exprs/aggregate/aggregate_function_simple_factory.h" #include "exprs/vectorized_agg_fn.h" #include "exprs/vslot_ref.h" - namespace doris { class RuntimeState; } // namespace doris @@ -468,6 +467,9 @@ Status StreamingAggLocalState::_get_results_with_serialized_key(RuntimeState* st } } + // Compute effective max rows based on estimated bytes per row from previous calls. + const size_t effective_max_rows = _budget.effective_max_rows(_estimated_row_bytes); + std::visit( Overload { [&](std::monostate& arg) -> void { @@ -476,7 +478,7 @@ Status StreamingAggLocalState::_get_results_with_serialized_key(RuntimeState* st [&](auto& agg_method) -> void { agg_method.init_iterator(); auto& data = *agg_method.hash_table; - const auto size = std::min(data.size(), size_t(state->batch_size())); + const auto size = std::min(data.size(), effective_max_rows); using KeyType = std::decay_t::Key; std::vector keys(size); @@ -501,7 +503,7 @@ Status StreamingAggLocalState::_get_results_with_serialized_key(RuntimeState* st { SCOPED_TIMER(_hash_table_iterate_timer); auto& it = agg_method.begin; - while (it != agg_method.end && num_rows < state->batch_size()) { + while (it != agg_method.end && num_rows < effective_max_rows) { keys[num_rows] = it.get_first(); auto inline_count = reinterpret_cast(it.get_second()); @@ -523,7 +525,7 @@ Status StreamingAggLocalState::_get_results_with_serialized_key(RuntimeState* st if (agg_method.hash_table->has_null_key_data()) { DCHECK(key_columns.size() == 1); DCHECK(key_columns[0]->is_nullable()); - if (num_rows < state->batch_size()) { + if (num_rows < effective_max_rows) { key_columns[0]->insert_data(nullptr, 0); auto mapped = agg_method.hash_table->template get_null_key_data< @@ -552,7 +554,7 @@ Status StreamingAggLocalState::_get_results_with_serialized_key(RuntimeState* st { SCOPED_TIMER(_hash_table_iterate_timer); while (iter != _aggregate_data_container->end() && - num_rows < state->batch_size()) { + num_rows < effective_max_rows) { keys[num_rows] = iter.template get_key(); _values[num_rows] = iter.get_aggregate_data(); ++iter; @@ -619,6 +621,10 @@ Status StreamingAggLocalState::_get_results_with_serialized_key(RuntimeState* st *block = Block(columns_with_schema); } + if (block->rows() > 0) { + _estimated_row_bytes = block->bytes() / block->rows(); + } + return Status::OK(); } diff --git a/be/src/exec/operator/streaming_aggregation_operator.h b/be/src/exec/operator/streaming_aggregation_operator.h index 40a8de2824446a..abc289abc1a94a 100644 --- a/be/src/exec/operator/streaming_aggregation_operator.h +++ b/be/src/exec/operator/streaming_aggregation_operator.h @@ -101,6 +101,7 @@ class StreamingAggLocalState MOCK_REMOVE(final) : public PipelineXLocalStatebatch_size() - cast_set(columns[p._child_slots.size()]->size()); if (remaining_capacity <= 0) { return Status::OK(); } + // Reduce capacity based on block_max_bytes using estimated bytes per output row. + const auto block_max_bytes = state->preferred_block_size_bytes(); + if (_child_block->rows() > 0) { + size_t estimated_bytes_per_row = _child_block->bytes() / _child_block->rows(); + if (estimated_bytes_per_row > 0) { + size_t current_bytes = 0; + for (const auto& col : columns) { + current_bytes += col->byte_size(); + } + if (current_bytes >= block_max_bytes) { + return Status::OK(); + } + int bytes_capacity = static_cast( + std::min((block_max_bytes - current_bytes) / estimated_bytes_per_row, + static_cast(INT_MAX))); + remaining_capacity = std::min(remaining_capacity, std::max(1, bytes_capacity)); + } + } + const auto& offsets = *_block_fast_path_ctx.offsets_ptr; const auto child_rows = cast_set(offsets.size()); @@ -511,7 +530,7 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, Block* o if (use_slow_path) { bool skip_child_row = false; - while (columns[p._child_slots.size()]->size() < state->batch_size()) { + while (_budget.within_budget(columns[p._child_slots.size()]->size(), m_block.bytes())) { RETURN_IF_CANCELLED(state); if (_child_block->rows() == 0) { @@ -594,9 +613,11 @@ Status TableFunctionLocalState::_get_expanded_block_for_outer_conjuncts(RuntimeS child_row_to_output_rows_indices.push_back(0); } - auto batch_size = state->batch_size(); auto output_row_count = columns[child_slot_count]->size(); - while (output_row_count < batch_size) { + auto within_block_limit = [&]() -> bool { + return _budget.within_budget(output_row_count, m_block.bytes()); + }; + while (within_block_limit()) { RETURN_IF_CANCELLED(state); // finished handling current child block @@ -605,7 +626,7 @@ Status TableFunctionLocalState::_get_expanded_block_for_outer_conjuncts(RuntimeS } bool skip_child_row = false; - while (output_row_count < batch_size) { + while (within_block_limit()) { // if table function is not outer and has empty result, go to next child row if (_fns[0]->eos() || skip_child_row) { _copy_output_slots(columns, p); @@ -625,8 +646,9 @@ Status TableFunctionLocalState::_get_expanded_block_for_outer_conjuncts(RuntimeS // It may take multiple iterations of this while loop to process a child row if // the table function produces a large number of rows. - auto repeat_times = _fns[0]->get_value(columns[child_slot_count], - batch_size - (int)output_row_count); + auto repeat_times = + _fns[0]->get_value(columns[child_slot_count], + static_cast(_budget.max_rows - output_row_count)); _current_row_insert_times += repeat_times; output_row_count = columns[child_slot_count]->size(); } @@ -636,7 +658,7 @@ Status TableFunctionLocalState::_get_expanded_block_for_outer_conjuncts(RuntimeS // _cur_child_offset == -1 // 2. output_block reaches batch size // fn maybe or maybe not eos - if (output_row_count >= batch_size) { + if (_budget.exceeded(output_row_count, m_block.bytes())) { _copy_output_slots(columns, p); handled_row_indices.push_back(_cur_child_offset); child_row_to_output_rows_indices.push_back(output_row_count); diff --git a/be/src/exec/operator/union_sink_operator.cpp b/be/src/exec/operator/union_sink_operator.cpp index dc6fb2e0d9a1e3..a8517b2dc3b7da 100644 --- a/be/src/exec/operator/union_sink_operator.cpp +++ b/be/src/exec/operator/union_sink_operator.cpp @@ -133,7 +133,9 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool eos) return Status::OK(); } // not eos and block rows is enough to output,so push block - if (local_state._output_block && (local_state._output_block->rows() >= state->batch_size())) { + const auto block_max_bytes = state->preferred_block_size_bytes(); + if (local_state._output_block && (local_state._output_block->rows() >= state->batch_size() || + local_state._output_block->bytes() >= block_max_bytes)) { RETURN_IF_ERROR(local_state._shared_state->data_queue.push_block( std::move(local_state._output_block), _cur_child_id)); } diff --git a/be/src/exec/operator/union_source_operator.cpp b/be/src/exec/operator/union_source_operator.cpp index a484f1e4a324ba..795d496a75c452 100644 --- a/be/src/exec/operator/union_source_operator.cpp +++ b/be/src/exec/operator/union_source_operator.cpp @@ -35,6 +35,9 @@ namespace doris { class RuntimeState; +UnionSourceLocalState::UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {} + Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); @@ -152,7 +155,8 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* state, Block* block) { MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, row_descriptor()); ColumnsWithTypeAndName tmp_block_columns; - for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() < state->batch_size(); + for (; _const_expr_list_idx < _const_expr_lists.size() && + local_state.block_budget().within_budget(mblock.rows(), mblock.bytes()); ++_const_expr_list_idx) { int const_expr_lists_size = cast_set(_const_expr_lists[_const_expr_list_idx].size()); if (_const_expr_list_idx && const_expr_lists_size != _const_expr_lists[0].size()) { diff --git a/be/src/exec/operator/union_source_operator.h b/be/src/exec/operator/union_source_operator.h index 748c9a48e60c55..3f22f1bfb3ff7b 100644 --- a/be/src/exec/operator/union_source_operator.h +++ b/be/src/exec/operator/union_source_operator.h @@ -36,7 +36,7 @@ class UnionSourceLocalState final : public PipelineXLocalState ENABLE_FACTORY_CREATOR(UnionSourceLocalState); using Base = PipelineXLocalState; using Parent = UnionSourceOperatorX; - UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; + UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; @@ -140,4 +140,4 @@ class UnionSourceOperatorX MOCK_REMOVE(final) : public OperatorX _const_expr_lists; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp index 3d36e1f10eb888..ad77f7c50e0ad8 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp +++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp @@ -339,7 +339,8 @@ Status VIcebergSortWriter::_create_merger(bool is_final_merge, size_t batch_size // Create a multi-way merge sorter that reads from multiple sorted spill streams std::vector child_block_suppliers; _merger = std::make_unique(_sorter->get_sort_description(), batch_size, -1, 0, - _profile); + _profile, + _runtime_state->preferred_block_size_bytes()); _current_merging_spill_files.clear(); // For final merge: merge all remaining streams diff --git a/be/src/exec/sort/heap_sorter.cpp b/be/src/exec/sort/heap_sorter.cpp index ab5e898e67a643..cb463c3e23d921 100644 --- a/be/src/exec/sort/heap_sorter.cpp +++ b/be/src/exec/sort/heap_sorter.cpp @@ -97,7 +97,8 @@ Status HeapSorter::prepare_for_read(bool is_spill) { } Status HeapSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - return _state->merge_sort_read(block, state->batch_size(), eos); + return _state->merge_sort_read(block, state->batch_size(), state->preferred_block_size_bytes(), + eos); } Field HeapSorter::get_top_value() { diff --git a/be/src/exec/sort/partition_sorter.cpp b/be/src/exec/sort/partition_sorter.cpp index 64422a202c236f..e0c232a9188d29 100644 --- a/be/src/exec/sort/partition_sorter.cpp +++ b/be/src/exec/sort/partition_sorter.cpp @@ -90,13 +90,14 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { if (_top_n_algorithm == TopNAlgorithm::ROW_NUMBER) { - return _read_row_num(block, eos, state->batch_size()); + return _read_row_num(block, eos, state->batch_size(), state->preferred_block_size_bytes()); } else { - return _read_row_rank(block, eos, state->batch_size()); + return _read_row_rank(block, eos, state->batch_size(), state->preferred_block_size_bytes()); } } -Status PartitionSorter::_read_row_num(Block* output_block, bool* eos, int batch_size) { +Status PartitionSorter::_read_row_num(Block* output_block, bool* eos, int batch_size, + size_t block_max_bytes) { auto& queue = _state->get_queue(); size_t num_columns = _state->unsorted_block()->columns(); @@ -145,12 +146,18 @@ Status PartitionSorter::_read_row_num(Block* output_block, bool* eos, int batch_ } else { queue.remove_top(); } + + // block_max_bytes == 0 means no byte budget. + if (block_max_bytes > 0 && merged_rows > 0 && m_block.bytes() >= block_max_bytes) { + break; + } } return Status::OK(); } -Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) { +Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size, + size_t block_max_bytes) { auto& queue = _state->get_queue(); size_t num_columns = _state->unsorted_block()->columns(); @@ -193,6 +200,12 @@ Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch } else { queue.remove_top(); } + + // block_max_bytes == 0 means no byte budget. + if (block_max_bytes > 0 && (merged_rows & 255) == 0 && + m_block.bytes() >= block_max_bytes) { + return Status::OK(); + } } } diff --git a/be/src/exec/sort/partition_sorter.h b/be/src/exec/sort/partition_sorter.h index 707d992a0d7833..8fcb2c79301c78 100644 --- a/be/src/exec/sort/partition_sorter.h +++ b/be/src/exec/sort/partition_sorter.h @@ -95,8 +95,8 @@ class PartitionSorter final : public Sorter { void set_prepared_finish() { _prepared_finish = true; } private: - Status _read_row_num(Block* block, bool* eos, int batch_size); - Status _read_row_rank(Block* block, bool* eos, int batch_size); + Status _read_row_num(Block* block, bool* eos, int batch_size, size_t block_max_bytes); + Status _read_row_rank(Block* block, bool* eos, int batch_size, size_t block_max_bytes); bool _get_enough_data() const { if (_top_n_algorithm == TopNAlgorithm::DENSE_RANK) { // dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk < 3, need output all 1 and 2 diff --git a/be/src/exec/sort/sorter.cpp b/be/src/exec/sort/sorter.cpp index 88160819328ce0..bffeebb32009b7 100644 --- a/be/src/exec/sort/sorter.cpp +++ b/be/src/exec/sort/sorter.cpp @@ -86,14 +86,16 @@ Status MergeSorterState::build_merge_tree(const SortDescription& sort_descriptio return Status::OK(); } -Status MergeSorterState::merge_sort_read(doris::Block* block, int batch_size, bool* eos) { +Status MergeSorterState::merge_sort_read(doris::Block* block, int batch_size, + size_t block_max_bytes, bool* eos) { DCHECK(_sorted_blocks.empty()); DCHECK(unsorted_block()->empty()); - _merge_sort_read_impl(batch_size, block, eos); + _merge_sort_read_impl(batch_size, block_max_bytes, block, eos); return Status::OK(); } -void MergeSorterState::_merge_sort_read_impl(int batch_size, doris::Block* block, bool* eos) { +void MergeSorterState::_merge_sort_read_impl(int batch_size, size_t block_max_bytes, + doris::Block* block, bool* eos) { size_t num_columns = unsorted_block()->columns(); MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, *unsorted_block()); @@ -123,6 +125,11 @@ void MergeSorterState::_merge_sort_read_impl(int batch_size, doris::Block* block } else { _queue.remove_top(); } + + // block_max_bytes == 0 means no byte budget. + if (block_max_bytes > 0 && merged_rows > 0 && m_block.bytes() >= block_max_bytes) { + break; + } } block->set_columns(std::move(merged_columns)); @@ -258,12 +265,13 @@ Status FullSorter::prepare_for_read(bool is_spill) { } Status FullSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - return _state->merge_sort_read(block, state->batch_size(), eos); + return _state->merge_sort_read(block, state->batch_size(), state->preferred_block_size_bytes(), + eos); } Status FullSorter::merge_sort_read_for_spill(RuntimeState* state, doris::Block* block, int batch_size, bool* eos) { - return _state->merge_sort_read(block, batch_size, eos); + return _state->merge_sort_read(block, batch_size, state->preferred_block_size_bytes(), eos); } Status FullSorter::do_sort() { diff --git a/be/src/exec/sort/sorter.h b/be/src/exec/sort/sorter.h index 1651247eecc1ab..590e2009d2a647 100644 --- a/be/src/exec/sort/sorter.h +++ b/be/src/exec/sort/sorter.h @@ -65,7 +65,7 @@ class MergeSorterState { Status build_merge_tree(const SortDescription& sort_description); - Status merge_sort_read(doris::Block* block, int batch_size, bool* eos); + Status merge_sort_read(doris::Block* block, int batch_size, size_t block_max_bytes, bool* eos); size_t data_size() const { size_t size = _unsorted_block->bytes(); @@ -85,7 +85,8 @@ class MergeSorterState { void ignore_offset() { _offset = 0; } private: - void _merge_sort_read_impl(int batch_size, doris::Block* block, bool* eos); + void _merge_sort_read_impl(int batch_size, size_t block_max_bytes, doris::Block* block, + bool* eos); std::unique_ptr _unsorted_block; MergeSorterQueue _queue; diff --git a/be/src/exec/sort/topn_sorter.cpp b/be/src/exec/sort/topn_sorter.cpp index fc84cc777a7add..e5b1f6edb64599 100644 --- a/be/src/exec/sort/topn_sorter.cpp +++ b/be/src/exec/sort/topn_sorter.cpp @@ -54,7 +54,8 @@ Status TopNSorter::prepare_for_read(bool is_spill) { } Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - return _state->merge_sort_read(block, state->batch_size(), eos); + return _state->merge_sort_read(block, state->batch_size(), state->preferred_block_size_bytes(), + eos); } Status TopNSorter::_do_sort(Block* block) { diff --git a/be/src/exec/sort/vsorted_run_merger.cpp b/be/src/exec/sort/vsorted_run_merger.cpp index ce4440c3178343..2e41c6f1f962cd 100644 --- a/be/src/exec/sort/vsorted_run_merger.cpp +++ b/be/src/exec/sort/vsorted_run_merger.cpp @@ -34,20 +34,24 @@ namespace doris { VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr, const std::vector& is_asc_order, const std::vector& nulls_first, const size_t batch_size, - int64_t limit, size_t offset, RuntimeProfile* profile) + int64_t limit, size_t offset, RuntimeProfile* profile, + size_t block_max_bytes) : _ordering_expr(ordering_expr), _is_asc_order(is_asc_order), _nulls_first(nulls_first), _batch_size(batch_size), + _block_max_bytes(block_max_bytes), _limit(limit), _offset(offset) { init_timers(profile); } VSortedRunMerger::VSortedRunMerger(const SortDescription& desc, const size_t batch_size, - int64_t limit, size_t offset, RuntimeProfile* profile) + int64_t limit, size_t offset, RuntimeProfile* profile, + size_t block_max_bytes) : _desc(desc), _batch_size(batch_size), + _block_max_bytes(block_max_bytes), _use_sort_desc(true), _limit(limit), _offset(offset), @@ -114,6 +118,8 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { *eos = true; return Status::OK(); } else if (_priority_queue.size() == 1) { + // Single-run fast path: cut/swap the supplier block into output. + // Uses column->cut() which is O(columns) instead of row-by-row merge O(rows*columns). auto current = _priority_queue.top(); DCHECK(!current->eof()); DCHECK(current->block_ptr() != nullptr); @@ -132,21 +138,46 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } - if (!current->is_first()) { - for (int i = 0; i < current->block->columns(); i++) { - auto& column_with_type = current->block_ptr()->get_by_position(i); - column_with_type.column = - column_with_type.column->cut(current->pos, current->rows - current->pos); + size_t remaining = current->rows - current->pos; + size_t output_rows = std::min(remaining, _batch_size); + + // Apply byte budget: estimate rows that fit within _block_max_bytes. + // _block_max_bytes == 0 means no byte budget (e.g., iceberg sort writer). + if (_block_max_bytes > 0 && remaining > 1 && current->block_ptr()->bytes() > 0) { + size_t bytes_per_row = current->block_ptr()->bytes() / current->rows; + if (bytes_per_row > 0) { + size_t byte_limited = _block_max_bytes / bytes_per_row; + output_rows = std::min(output_rows, std::max(size_t(1), byte_limited)); } } - current->block_ptr()->swap(*output_block); - current->next(current->rows - current->pos); - if (current->eof()) { - *eos = true; + + if (current->is_first() && output_rows == remaining) { + // Entire block fits — zero-copy swap. + current->block_ptr()->swap(*output_block); } else { - _pending_cursor = current.impl; + // Build output block from cut columns without modifying the supplier block. + // The cursor (and its underlying block) is intentionally left in the priority + // queue so that the next get_next() call can continue slicing the remaining + // rows. MergeSortBlockCursor wraps a shared impl pointer, so advancing the + // cursor below via current->next(output_rows) updates the queue's view of + // the same cursor in place. + auto* src_block = current->block_ptr(); + output_block->clear(); + for (int i = 0; i < src_block->columns(); i++) { + auto col_with_type = src_block->get_by_position(i); + col_with_type.column = col_with_type.column->cut(current->pos, output_rows); + output_block->insert(std::move(col_with_type)); + } + } + current->next(output_rows); + if (output_rows >= remaining) { + _priority_queue.pop(); + if (current->eof()) { + *eos = true; + } else { + _pending_cursor = current.impl; + } } - _priority_queue.pop(); return Status::OK(); } else { size_t num_columns = _priority_queue.top().impl->block->columns(); @@ -196,6 +227,14 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { do_insert(); return Status::OK(); } + + if (merged_rows > 0 && (merged_rows & 255) == 0 && !_indexs.empty()) { + do_insert(); + // _block_max_bytes == 0 means no byte budget. + if (_block_max_bytes > 0 && m_block.bytes() >= _block_max_bytes) { + break; + } + } } do_insert(); output_block->set_columns(std::move(merged_columns)); diff --git a/be/src/exec/sort/vsorted_run_merger.h b/be/src/exec/sort/vsorted_run_merger.h index 191db39f295255..135e76a1f8cfc9 100644 --- a/be/src/exec/sort/vsorted_run_merger.h +++ b/be/src/exec/sort/vsorted_run_merger.h @@ -45,10 +45,10 @@ class VSortedRunMerger { // batch being returned. VSortedRunMerger(const VExprContextSPtrs& ordering_expr, const std::vector& _is_asc_order, const std::vector& _nulls_first, const size_t batch_size, int64_t limit, - size_t offset, RuntimeProfile* profile); + size_t offset, RuntimeProfile* profile, size_t block_max_bytes); VSortedRunMerger(const SortDescription& desc, const size_t batch_size, int64_t limit, - size_t offset, RuntimeProfile* profile); + size_t offset, RuntimeProfile* profile, size_t block_max_bytes); virtual ~VSortedRunMerger() = default; @@ -66,6 +66,7 @@ class VSortedRunMerger { const std::vector _is_asc_order; const std::vector _nulls_first; const size_t _batch_size; + const size_t _block_max_bytes; bool _use_sort_desc = false; size_t _num_rows_returned = 0; diff --git a/be/src/format/generic_reader.h b/be/src/format/generic_reader.h index 57e9d1736b7045..606024f5d4d43c 100644 --- a/be/src/format/generic_reader.h +++ b/be/src/format/generic_reader.h @@ -261,10 +261,6 @@ class GenericReader : public ProfileCollector { const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding) - // never let batch size be 0 because _do_get_next_block uses it as the - // upper bound of a `while (block->rows() < batch_size)` loop and a 0 would make the reader - // return without setting eof, causing the scanner to spin on empty blocks. - const size_t _DEFAULT_BATCH_SIZE = 4064; // 4094 - 32(padding) TPushAggOp::type _push_down_agg_type {}; public: diff --git a/be/src/storage/rowset/rowset_reader_context.h b/be/src/storage/rowset/rowset_reader_context.h index 6332ab51fc2089..bf5a27535dd13a 100644 --- a/be/src/storage/rowset/rowset_reader_context.h +++ b/be/src/storage/rowset/rowset_reader_context.h @@ -76,9 +76,6 @@ struct RowsetReaderContext { // Effective adaptive batch size byte budget. 0 means disabled internally. size_t preferred_block_size_bytes = 8388608UL; - // Points to the "true" output column list before non-direct-mode expansion. - // Used by BlockReader to map expanded storage columns back to the requested output layout. - const std::vector* origin_return_columns = nullptr; bool is_unique = false; //record row num merged in generic iterator uint64_t* merged_rows = nullptr; diff --git a/be/src/storage/tablet/tablet_reader.cpp b/be/src/storage/tablet/tablet_reader.cpp index 55d256c634deda..73d68e883257f9 100644 --- a/be/src/storage/tablet/tablet_reader.cpp +++ b/be/src/storage/tablet/tablet_reader.cpp @@ -201,10 +201,6 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { // Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW _reader_context.general_read_limit = read_params.general_read_limit; - // Preserve the original requested output layout so BlockReader can map expanded storage - // columns (for non-direct AGG/UNIQUE paths) back to the final output block. - _reader_context.origin_return_columns = read_params.origin_return_columns; - return Status::OK(); } diff --git a/be/test/core/value/sort_merger_test.cpp b/be/test/core/value/sort_merger_test.cpp index b5c0b8ced95e2a..0f1472ad118f91 100644 --- a/be/test/core/value/sort_merger_test.cpp +++ b/be/test/core/value/sort_merger_test.cpp @@ -17,6 +17,8 @@ #include +#include + #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "exec/sort/vsorted_run_merger.h" @@ -45,6 +47,8 @@ TEST(SortMergerTest, NULL_FIRST_ASC) { */ const int num_children = 5; const int batch_size = 5; + const size_t block_max_bytes = + 1 * 1024 * 1024; // 1MB, to avoid breaking early due to block size limit std::vector round; round.resize(num_children, 0); const int num_round = 2; @@ -59,7 +63,7 @@ TEST(SortMergerTest, NULL_FIRST_ASC) { const int limit = -1; const int offset = 0; merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, - limit, offset, profile.get())); + limit, offset, profile.get(), block_max_bytes)); } { std::vector child_block_suppliers; @@ -144,8 +148,10 @@ TEST(SortMergerTest, NULL_LAST_DESC) { std::vector nulls_first = {false}; const int limit = -1; const int offset = 0; + const size_t block_max_bytes = + 1 * 1024 * 1024; // 1MB, to avoid breaking early due to block size limit merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, - limit, offset, profile.get())); + limit, offset, profile.get(), block_max_bytes)); } { std::vector child_block_suppliers; @@ -213,6 +219,8 @@ TEST(SortMergerTest, TEST_LIMIT) { */ const int num_children = 5; const int batch_size = 5; + const size_t block_max_bytes = + 1 * 1024 * 1024; // 1MB, to avoid breaking early due to block size limit std::vector round; round.resize(num_children, 0); const int num_round = 2; @@ -227,7 +235,7 @@ TEST(SortMergerTest, TEST_LIMIT) { const int limit = 1; const int offset = 20; merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, - limit, offset, profile.get())); + limit, offset, profile.get(), block_max_bytes)); } { std::vector child_block_suppliers; @@ -272,6 +280,8 @@ TEST(SortMergerTest, LAST_BLOCK_WITH_EOS) { */ const int num_children = 5; const int batch_size = 5; + const size_t block_max_bytes = + 1 * 1024 * 1024; // 1MB, to avoid breaking early due to block size limit std::vector round; round.resize(num_children, 0); const int num_round = 1; @@ -286,7 +296,7 @@ TEST(SortMergerTest, LAST_BLOCK_WITH_EOS) { const int limit = -1; const int offset = 0; merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, - limit, offset, profile.get())); + limit, offset, profile.get(), block_max_bytes)); } { std::vector child_block_suppliers; @@ -336,6 +346,7 @@ TEST(SortMergerTest, TEST_BIG_OFFSET_SINGLE_STREAM) { */ const int num_children = 1; const int batch_size = 5; + const size_t block_max_bytes = 1 * 1024 * 1024; // 1MB std::vector round; round.resize(num_children, 0); const int num_round = 1; @@ -350,7 +361,7 @@ TEST(SortMergerTest, TEST_BIG_OFFSET_SINGLE_STREAM) { const int limit = 1; const int offset = 20; merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, - limit, offset, profile.get())); + limit, offset, profile.get(), block_max_bytes)); } { std::vector child_block_suppliers; @@ -385,6 +396,8 @@ TEST(SortMergerTest, TEST_SMALL_OFFSET_SINGLE_STREAM) { */ const int num_children = 1; const int batch_size = 5; + const size_t block_max_bytes = + 1 * 1024 * 1024; // 1MB, to avoid breaking early due to block size limit std::vector round; round.resize(num_children, 0); const int num_round = 1; @@ -399,7 +412,7 @@ TEST(SortMergerTest, TEST_SMALL_OFFSET_SINGLE_STREAM) { const int limit = 1; const int offset = 4; merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, - limit, offset, profile.get())); + limit, offset, profile.get(), block_max_bytes)); } { std::vector child_block_suppliers; @@ -435,6 +448,8 @@ TEST(SortMergerTest, TEST_SINGLE_STREAM) { */ const int num_children = 1; const int batch_size = 5; + const size_t block_max_bytes = + 1 * 1024 * 1024; // 1MB, to avoid breaking early due to block size limit std::vector round; round.resize(num_children, 0); const int num_round = 1; @@ -449,7 +464,7 @@ TEST(SortMergerTest, TEST_SINGLE_STREAM) { const int limit = -1; const int offset = 0; merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, - limit, offset, profile.get())); + limit, offset, profile.get(), block_max_bytes)); } { std::vector child_block_suppliers; @@ -478,4 +493,265 @@ TEST(SortMergerTest, TEST_SINGLE_STREAM) { } } -} // namespace doris \ No newline at end of file +TEST(SortMergerTest, BLOCK_MAX_BYTES_LIMITS_OUTPUT) { + /** + * Test that block_max_bytes causes the merger to produce smaller blocks. + * Setup: 3 children, each providing 256 sorted Int64 rows. Total = 768 rows. + * batch_size=4096, block_max_bytes=512. + * Each nullable Int64 row ≈ 9 bytes. + * The periodic check at 256 rows finds 256*9=2304 > 512, so it breaks. + * Each output block should have at most 256 rows. + */ + const int num_children = 3; + const int batch_size = 4096; + const size_t block_max_bytes = 512; + std::vector round; + round.resize(num_children, 0); + const int num_round = 1; + + std::unique_ptr merger; + auto profile = std::make_shared(""); + auto ordering_expr = MockSlotRef::create_mock_contexts( + std::make_shared(std::make_shared())); + { + std::vector is_asc_order = {true}; + std::vector nulls_first = {true}; + merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, -1, + 0, profile.get(), block_max_bytes)); + } + { + std::vector child_block_suppliers; + for (int child_idx = 0; child_idx < num_children; child_idx++) { + BlockSupplier block_supplier = [&, round_vec = &round, num_round = num_round, + id = child_idx](Block* block, bool* eos) { + std::vector values; + std::vector null_map; + const int rows_per_block = 256; + for (int i = 0; i < rows_per_block; i++) { + values.push_back(id * rows_per_block + i); + null_map.push_back(0); + } + *block = ColumnHelper::create_nullable_block(values, null_map); + *eos = ++((*round_vec)[id]) == num_round; + return Status::OK(); + }; + child_block_suppliers.push_back(block_supplier); + } + EXPECT_TRUE(merger->prepare(child_block_suppliers).ok()); + } + { + size_t total_rows = 0; + int num_blocks = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + if (block.rows() > 0) { + EXPECT_LE(block.rows(), 256) + << "block_max_bytes should limit output block size, got " << block.rows() + << " rows with " << block.bytes() << " bytes"; + total_rows += block.rows(); + num_blocks++; + } + } + EXPECT_EQ(total_rows, 768); + EXPECT_GT(num_blocks, 1); + } +} + +TEST(SortMergerTest, BLOCK_MAX_BYTES_ZERO_DISABLES_CHECK) { + /** + * When block_max_bytes=0, blocks are limited only by batch_size. + */ + const int num_children = 2; + const int batch_size = 10; + std::vector round; + round.resize(num_children, 0); + const int num_round = 1; + + std::unique_ptr merger; + auto profile = std::make_shared(""); + auto ordering_expr = MockSlotRef::create_mock_contexts( + std::make_shared(std::make_shared())); + { + std::vector is_asc_order = {true}; + std::vector nulls_first = {true}; + merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, -1, + 0, profile.get(), /*block_max_bytes=*/0)); + } + { + std::vector child_block_suppliers; + for (int child_idx = 0; child_idx < num_children; child_idx++) { + BlockSupplier block_supplier = [&, round_vec = &round, num_round = num_round, + id = child_idx](Block* block, bool* eos) { + std::vector values; + std::vector null_map; + for (int i = 0; i < 10; i++) { + values.push_back(id * 10 + i); + null_map.push_back(0); + } + *block = ColumnHelper::create_nullable_block(values, null_map); + *eos = ++((*round_vec)[id]) == num_round; + return Status::OK(); + }; + child_block_suppliers.push_back(block_supplier); + } + EXPECT_TRUE(merger->prepare(child_block_suppliers).ok()); + } + { + size_t total_rows = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + if (block.rows() > 0) { + // With block_max_bytes=0, rows per block should be exactly batch_size + // (except possibly the last block). + EXPECT_LE(block.rows(), batch_size); + total_rows += block.rows(); + } + } + // 2 children * 10 rows each = 20 total + EXPECT_EQ(total_rows, 20); + } +} + +TEST(SortMergerTest, BLOCK_MAX_BYTES_WITH_MANY_ROWS) { + /** + * Test block_max_bytes with 5 children each producing 1024 rows. + * Total = 5120 rows. batch_size=8192. block_max_bytes=2048. + * Verifies all rows are output and no block exceeds batch_size. + */ + const int num_children = 5; + const int batch_size = 8192; + const size_t block_max_bytes = 2048; + const int rows_per_child = 1024; + std::vector supply_idx; + supply_idx.resize(num_children, 0); + + std::unique_ptr merger; + auto profile = std::make_shared(""); + auto ordering_expr = MockSlotRef::create_mock_contexts( + std::make_shared(std::make_shared())); + { + std::vector is_asc_order = {true}; + std::vector nulls_first = {false}; + merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, -1, + 0, profile.get(), block_max_bytes)); + } + { + std::vector child_block_suppliers; + for (int child_idx = 0; child_idx < num_children; child_idx++) { + BlockSupplier block_supplier = [&, idx_ptr = &supply_idx, id = child_idx, + rows_per_child = rows_per_child](Block* block, + bool* eos) { + const int block_rows = 256; + int& idx = (*idx_ptr)[id]; + if (idx >= rows_per_child) { + *eos = true; + return Status::OK(); + } + int remaining = std::min(block_rows, rows_per_child - idx); + std::vector values; + std::vector null_map; + for (int i = 0; i < remaining; i++) { + values.push_back(idx + i); + null_map.push_back(0); + } + *block = ColumnHelper::create_nullable_block(values, null_map); + idx += remaining; + *eos = (idx >= rows_per_child); + return Status::OK(); + }; + child_block_suppliers.push_back(block_supplier); + } + EXPECT_TRUE(merger->prepare(child_block_suppliers).ok()); + } + { + size_t total_rows = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + if (block.rows() > 0) { + EXPECT_LE(block.rows(), static_cast(batch_size)); + total_rows += block.rows(); + } + } + EXPECT_EQ(total_rows, num_children * rows_per_child); + } +} + +TEST(SortMergerTest, SINGLE_RUN_FAST_PATH_BYTE_BUDGET) { + /** + * Single-run fast path with byte budget: 1 child providing 256 rows in one block. + * batch_size=4096, block_max_bytes=100. + * Each nullable Int64 row ≈ 9 bytes (8 data + 1 null). + * byte_limited ≈ 100/9 ≈ 11 rows per output block. + * Verifies: + * - All rows are output (no data loss from cut on supplier block) + * - Multiple output blocks are produced (byte budget limits each block) + * - Data is sorted correctly + */ + const int batch_size = 4096; + const size_t block_max_bytes = 100; + const int total_rows = 256; + + std::unique_ptr merger; + auto profile = std::make_shared(""); + auto ordering_expr = MockSlotRef::create_mock_contexts( + std::make_shared(std::make_shared())); + { + std::vector is_asc_order = {true}; + std::vector nulls_first = {false}; + merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, -1, + 0, profile.get(), block_max_bytes)); + } + { + int supply_count = 0; + std::vector child_block_suppliers; + BlockSupplier block_supplier = [&supply_count](Block* block, bool* eos) { + std::vector values; + std::vector null_map; + for (int i = 0; i < 256; i++) { + values.push_back(i); + null_map.push_back(0); + } + *block = ColumnHelper::create_nullable_block(values, null_map); + supply_count++; + *eos = true; + return Status::OK(); + }; + child_block_suppliers.push_back(block_supplier); + EXPECT_TRUE(merger->prepare(child_block_suppliers).ok()); + EXPECT_EQ(merger->_priority_queue.size(), 1) << "single child = single-run fast path"; + } + { + size_t total_output_rows = 0; + int num_blocks = 0; + Int64 last_value = -1; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + if (block.rows() > 0) { + num_blocks++; + // Verify sorted order and continuity + auto col = block.get_by_position(0).column; + auto nullable_col = + assert_cast(col.get())->get_nested_column_ptr(); + auto* data = assert_cast(nullable_col.get())->get_data().data(); + for (size_t i = 0; i < block.rows(); i++) { + EXPECT_EQ(data[i], last_value + 1) + << "row " << (total_output_rows + i) << " mismatch"; + last_value = data[i]; + } + total_output_rows += block.rows(); + } + } + EXPECT_EQ(total_output_rows, total_rows) << "all rows must be output (no data loss)"; + EXPECT_GT(num_blocks, 1) << "byte budget should produce multiple blocks"; + } +} + +} // namespace doris diff --git a/be/test/exec/exchange/block_serializer_test.cpp b/be/test/exec/exchange/block_serializer_test.cpp new file mode 100644 index 00000000000000..8b9c75371dd13b --- /dev/null +++ b/be/test/exec/exchange/block_serializer_test.cpp @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "common/config.h" +#include "core/block/block.h" +#include "core/data_type/data_type_number.h" +#include "exec/exchange/vdata_stream_sender.h" +#include "exec/operator/exchange_sink_operator.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_runtime_state.h" + +namespace doris { + +// Tests for BlockSerializer::next_serialized_block, focusing on the byte-budget +// breakout path introduced by the adaptive batch size feature. +class BlockSerializerTest : public ::testing::Test { +protected: + void SetUp() override { + _saved_adaptive = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + } + void TearDown() override { config::enable_adaptive_batch_size = _saved_adaptive; } + + bool _saved_adaptive = false; +}; + +// With a tight byte budget, accumulating small blocks must trigger serialization +// once the merged block bytes reach the budget. Without the byte budget this would +// only fire on row-count overflow or eos. +TEST_F(BlockSerializerTest, ByteBudgetTriggersSerialize) { + MockRuntimeState state; + // Large row cap + tiny byte budget so the test isolates the byte-driven path. + state._batch_size = 1 << 20; + state._query_options.__set_batch_size(state._batch_size); + state._query_options.__set_preferred_block_size_bytes(64); + + ExchangeSinkLocalState local_state(&state); + BlockSerializer serializer(&local_state, /*is_local=*/true); + + PBlock pblock; + bool serialized = false; + + // Push enough rows that the merged Int32 column exceeds 64 bytes. + // 16 Int32 values == 64 bytes of column data; merging triggers exceeded(). + for (int i = 0; i < 32; ++i) { + Block block = ColumnHelper::create_block({i}); + ASSERT_TRUE(serializer + .next_serialized_block(&block, &pblock, /*num_receivers=*/1, + &serialized, /*eos=*/false) + .ok()); + if (serialized) { + break; + } + } + EXPECT_TRUE(serialized) << "byte budget never triggered serialize"; + ASSERT_NE(serializer.get_block(), nullptr); + EXPECT_GE(serializer.get_block()->bytes(), 64U); +} + +// eos must always force serialize, regardless of budget state. +TEST_F(BlockSerializerTest, EosForcesSerialize) { + MockRuntimeState state; + state._batch_size = 1 << 20; + state._query_options.__set_batch_size(state._batch_size); + state._query_options.__set_preferred_block_size_bytes(1ULL << 30); + + ExchangeSinkLocalState local_state(&state); + BlockSerializer serializer(&local_state, /*is_local=*/true); + + Block block = ColumnHelper::create_block({1, 2, 3}); + PBlock pblock; + bool serialized = false; + ASSERT_TRUE( + serializer.next_serialized_block(&block, &pblock, 1, &serialized, /*eos=*/true).ok()); + EXPECT_TRUE(serialized); +} + +// When neither budget nor eos triggers, serialization must not happen so the +// upstream caller continues to accumulate. +TEST_F(BlockSerializerTest, NoTriggerLeavesUnserialized) { + MockRuntimeState state; + state._batch_size = 1 << 20; + state._query_options.__set_batch_size(state._batch_size); + state._query_options.__set_preferred_block_size_bytes(1ULL << 30); + + ExchangeSinkLocalState local_state(&state); + BlockSerializer serializer(&local_state, /*is_local=*/true); + + Block block = ColumnHelper::create_block({1, 2, 3}); + PBlock pblock; + bool serialized = true; // sentinel + ASSERT_TRUE( + serializer.next_serialized_block(&block, &pblock, 1, &serialized, /*eos=*/false).ok()); + EXPECT_FALSE(serialized); + ASSERT_NE(serializer.get_block(), nullptr); + EXPECT_EQ(serializer.get_block()->rows(), 3U); +} + +} // namespace doris diff --git a/be/test/exec/operator/agg_operator_test.cpp b/be/test/exec/operator/agg_operator_test.cpp index 945fd0f9f1fc81..cdacdcff0ae36d 100644 --- a/be/test/exec/operator/agg_operator_test.cpp +++ b/be/test/exec/operator/agg_operator_test.cpp @@ -716,4 +716,57 @@ TEST(AggOperatorTestWithOutGroupBy, other_case_3) { } } +// Verify aggregation source respects byte budget via BlockBudget::effective_max_rows. +// With a tiny byte budget, each output block should contain fewer rows than batch_size. +TEST_F(AggOperatorTestWithGroupBy, ByteBudgetLimitsOutputRows) { + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + + OperatorContext ctx; + // Set a large batch_size but tiny byte budget so effective_max_rows is small. + ctx.state._batch_size = 4096; + ctx.state._query_options.__set_batch_size(4096); + ctx.state._query_options.__set_preferred_block_size_bytes(1); + ctx.state._query_options.__set_batch_size(65535); + + auto sink_op = std::make_shared(); + sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator( + ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared()), + false, false)); + sink_op->_pool = &ctx.pool; + EXPECT_TRUE(sink_op->prepare(&ctx.state).ok()); + sink_op->_probe_expr_ctxs = + MockSlotRef::create_mock_contexts(0, std::make_shared()); + + auto source_op = std::make_shared(); + source_op->mock_row_descriptor.reset(new MockRowDescriptor { + {std::make_shared(), std::make_shared()}, &ctx.pool}); + source_op->_without_key = false; + source_op->_needs_finalize = true; + EXPECT_TRUE(source_op->prepare(&ctx.state).ok()); + + auto shared_state = init_sink_and_source(sink_op, source_op, ctx); + + { + Block block { + ColumnHelper::create_column_with_name({1, 1, 2, 2, 2, 3}), + ColumnHelper::create_column_with_name({1, 1, 100, 100, 100, 1000})}; + EXPECT_TRUE(sink_op->sink(&ctx.state, &block, true).ok()); + } + + // Collect all output blocks — with a tiny byte budget the source should still produce + // all 3 distinct groups, possibly across multiple get_block calls. + size_t total_rows = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(source_op->get_block(&ctx.state, &block, &eos).ok()); + total_rows += block.rows(); + } + // All 3 groups must be output regardless of byte budget. + EXPECT_EQ(total_rows, 3); + + config::enable_adaptive_batch_size = saved; +} + } // namespace doris diff --git a/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp b/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp index 88434e47fd7e01..5b43fe32c34931 100644 --- a/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp +++ b/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp @@ -35,6 +35,8 @@ struct DistinctStreamingAggOperatorTest : public ::testing::Test { mock_op = std::make_shared(); state = std::make_shared(); state->_batch_size = 10; + state->_query_options.__set_batch_size(10); + state->_query_options.__set_batch_size(10); op->_child = mock_op; } @@ -194,4 +196,39 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) { { EXPECT_TRUE(op->close(state.get())); } } +// Verify byte budget limits the output block size in distinct streaming aggregation. +TEST_F(DistinctStreamingAggOperatorTest, ByteBudgetLimitsOutputBlock) { + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + state->_batch_size = 4096; + state->_query_options.__set_batch_size(4096); + state->_query_options.__set_preferred_block_size_bytes(1); + state->_query_options.__set_batch_size(65535); + + op->_is_streaming_preagg = false; + create_op({std::make_shared()}, {std::make_shared()}); + + // Push data. With tiny byte budget, need_more_input_data should return false + // quickly since within_budget(rows, bytes) will become false when bytes >= 1. + { + auto block = ColumnHelper::create_block({1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + EXPECT_TRUE(op->push(state.get(), &block, false)); + } + + // After push of 10 distinct values, need_more_input_data should be false + // because the aggregated block bytes exceed the tiny budget. + EXPECT_FALSE(op->need_more_input_data(state.get())); + + // Pull should return the aggregated block. + { + Block block; + bool eos = false; + EXPECT_TRUE(op->pull(state.get(), &block, &eos)); + EXPECT_EQ(block.rows(), 10); + } + + EXPECT_TRUE(op->close(state.get())); + config::enable_adaptive_batch_size = saved; +} + } // namespace doris diff --git a/be/test/exec/operator/exchange_source_operator_test.cpp b/be/test/exec/operator/exchange_source_operator_test.cpp index 1f7abc4e00fb06..44e3d87508381a 100644 --- a/be/test/exec/operator/exchange_source_operator_test.cpp +++ b/be/test/exec/operator/exchange_source_operator_test.cpp @@ -44,7 +44,7 @@ struct MOCKVDataStreamRecvr : public VDataStreamRecvr { Status create_merger(const VExprContextSPtrs& ordering_expr, const std::vector& is_asc_order, const std::vector& nulls_first, size_t batch_size, int64_t limit, - size_t offset) override { + size_t offset, size_t block_max_bytes = 0) override { return Status::OK(); } void set_block(std::vector dates) { diff --git a/be/test/exec/operator/hashjoin_probe_operator_test.cpp b/be/test/exec/operator/hashjoin_probe_operator_test.cpp index 9828744a58f0dd..59c2953c858e55 100644 --- a/be/test/exec/operator/hashjoin_probe_operator_test.cpp +++ b/be/test/exec/operator/hashjoin_probe_operator_test.cpp @@ -1251,4 +1251,114 @@ TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMarkOtherConjuncts) { Field::create_field(1)}); } +// Regression for the build-side filtering in ProcessHashTableProbe::_init_probe_side: +// LEFT_SEMI joins do not output build columns, so the byte budget pre-estimate must +// not include the build-side bytes-per-row contribution. Verify the LEFT_SEMI output +// is still correct under a tight byte budget that exercises the new code path. +TEST_F(HashJoinProbeOperatorTest, LeftSemiJoinWithAdaptiveBatchSize) { + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + _helper.runtime_state->_query_options.__set_preferred_block_size_bytes(64); + _helper.runtime_state->_query_options.__set_batch_size(65535); + + auto sink_block = ColumnHelper::create_block({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector build_blocks = {sink_block}; + std::vector probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_SEMI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + config::enable_adaptive_batch_size = saved; + + // Same expected matches as the LeftSemiJoin test: probe rows (3,"c") and (4,"d"). + auto sorted_block = sort_block_by_columns(output_block); + ASSERT_EQ(sorted_block.rows(), 2); + check_column_values(*sorted_block.get_by_position(0).column, + {Field::create_field(3), Field::create_field(4)}); + check_column_values( + *sorted_block.get_by_position(1).column, + {Field::create_field("c"), Field::create_field("d")}); +} + +// Symmetric coverage for the probe-side filter under a tight byte budget: RIGHT_SEMI +// joins do not output probe columns, so the pre-estimate path runs without the +// probe-side bytes-per-row contribution. +TEST_F(HashJoinProbeOperatorTest, RightSemiJoinWithAdaptiveBatchSize) { + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + _helper.runtime_state->_query_options.__set_preferred_block_size_bytes(64); + _helper.runtime_state->_query_options.__set_batch_size(65535); + + auto sink_block = ColumnHelper::create_block({1, 2, 3, 4}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name( + {"a", "b", "c", "d"}, {0, 0, 0, 0})); + + auto probe_block = ColumnHelper::create_nullable_block({1, 1, 2, 3, 4, 5}, + {0, 0, 0, 0, 0, 0}); + probe_block.insert( + ColumnHelper::create_column_with_name({"a", "a", "b", "c", "d", "e"})); + + Block output_block; + std::vector build_blocks = {sink_block}; + std::vector probe_blocks = {probe_block}; + run_test({TJoinOp::RIGHT_SEMI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + config::enable_adaptive_batch_size = saved; + + // RIGHT_SEMI returns build-side rows that have at least one probe match. + auto sorted_block = sort_block_by_columns(output_block); + ASSERT_EQ(sorted_block.rows(), 4); + check_column_values(*sorted_block.get_by_position(0).column, + {Field::create_field(1), Field::create_field(2), + Field::create_field(3), Field::create_field(4)}); + check_column_values( + *sorted_block.get_by_position(1).column, + {Field::create_field("a"), Field::create_field("b"), + Field::create_field("c"), Field::create_field("d")}); +} + +TEST_F(HashJoinProbeOperatorTest, InnerJoinWithAdaptiveBatchSize) { + // Enable adaptive batch size with a small byte budget to exercise the batch adjustment code. + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + _helper.runtime_state->_query_options.__set_preferred_block_size_bytes(512); + _helper.runtime_state->_query_options.__set_batch_size(65535); + + auto sink_block = ColumnHelper::create_block({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block({1, 2, 3, 4, 5}, {0, 0, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector build_blocks = {sink_block}; + std::vector probe_blocks = {probe_block}; + run_test({.join_op_type = TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + config::enable_adaptive_batch_size = saved; + + // With adaptive byte budget, result correctness is preserved. + // Build key (1,null) won't match probe (1,"a"), (2,"b")→(2,"b"), (3,"c")→(3,"c"), + // (4,"d")→(4,"d"), probe (null,"e") won't match → 3 output rows. + ASSERT_EQ(output_block.rows(), 3); + Block sorted_block = sort_block_by_columns(output_block); + + check_column_values(*sorted_block.get_by_position(0).column, + {Field::create_field(2), Field::create_field(3), + Field::create_field(4)}); +} + } // namespace doris \ No newline at end of file diff --git a/be/test/exec/operator/set_operator_test.cpp b/be/test/exec/operator/set_operator_test.cpp index 7e3bcfef30d3be..f8ebea809fdc67 100644 --- a/be/test/exec/operator/set_operator_test.cpp +++ b/be/test/exec/operator/set_operator_test.cpp @@ -688,4 +688,43 @@ TEST_F(ExceptOperatorTest, test_refresh_hash_table) { EXPECT_TRUE(block.empty()); } } -} // namespace doris \ No newline at end of file + +// Verify set source respects byte budget via BlockBudget::effective_max_rows. +TEST_F(ExceptOperatorTest, ByteBudgetLimitsOutputRows) { + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + state->_batch_size = 4096; + state->_query_options.__set_preferred_block_size_bytes(1); + state->_query_options.__set_batch_size(65535); + + init_op(2, {std::make_shared(std::make_shared())}); + sink_op->_child_exprs = MockSlotRef::create_mock_contexts( + DataTypes {std::make_shared(std::make_shared())}); + probe_sink_ops[0]->_child_exprs = MockSlotRef::create_mock_contexts( + DataTypes {std::make_shared(std::make_shared())}); + init_local_state(); + + { + Block block = ColumnHelper::create_nullable_block( + {1, 2, 3, 4, 5}, {false, false, false, false, false}); + EXPECT_TRUE(sink_op->sink(state.get(), &block, true)); + } + { + Block block = ColumnHelper::create_nullable_block({}, {}); + EXPECT_TRUE(probe_sink_ops[0]->sink(states[0].get(), &block, true)); + } + + // Collect all output — byte budget is tiny so blocks may be split. + size_t total_rows = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos)); + total_rows += block.rows(); + } + // All 5 rows must be returned (no probe-side matches ⇒ EXCEPT returns all). + EXPECT_EQ(total_rows, 5); + + config::enable_adaptive_batch_size = saved; +} +} // namespace doris diff --git a/be/test/exec/operator/streaming_agg_operator_test.cpp b/be/test/exec/operator/streaming_agg_operator_test.cpp index 0421d58bfd256b..926652d22d5fe6 100644 --- a/be/test/exec/operator/streaming_agg_operator_test.cpp +++ b/be/test/exec/operator/streaming_agg_operator_test.cpp @@ -395,4 +395,60 @@ TEST_F(StreamingAggOperatorTest, test4) { { EXPECT_TRUE(local_state->close(state.get()).ok()); } } +// Verify streaming aggregation source respects byte budget via BlockBudget::effective_max_rows. +TEST_F(StreamingAggOperatorTest, ByteBudgetLimitsEffectiveMaxRows) { + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + state->_batch_size = 4096; + state->_query_options.__set_batch_size(4096); + state->_query_options.__set_preferred_block_size_bytes(1); + state->_query_options.__set_batch_size(65535); + + op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator( + pool, MockSlotRef::create_mock_contexts(1, std::make_shared()), false, + false)); + op->_pool = &pool; + op->_needs_finalize = true; + + EXPECT_TRUE(op->set_child(child_op)); + EXPECT_TRUE(op->prepare(state.get()).ok()); + op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared()); + + { + auto ls = std::make_unique(state.get(), op.get()); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(ls->init(state.get(), info).ok()); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(ls)); + } + + local_state = + static_cast(state->get_local_state(op->operator_id())); + EXPECT_TRUE(local_state->open(state.get()).ok()); + + { + Block block { + ColumnHelper::create_column_with_name({1, 1, 2, 2, 2, 3}), + ColumnHelper::create_column_with_name({1, 1, 100, 100, 100, 1000})}; + EXPECT_TRUE(op->push(state.get(), &block, true).ok()); + } + + // Pull once — with tiny byte budget but _estimated_row_bytes still 0, effective_max_rows + // falls back to block_max_rows=4096, so all 3 groups should be returned in one call. + { + Block block; + bool eos = false; + EXPECT_TRUE(op->pull(state.get(), &block, &eos).ok()); + EXPECT_EQ(block.rows(), 3); + EXPECT_TRUE(eos); + } + + EXPECT_TRUE(local_state->close(state.get()).ok()); + config::enable_adaptive_batch_size = saved; +} + } // namespace doris diff --git a/be/test/exec/operator/table_function_operator_test.cpp b/be/test/exec/operator/table_function_operator_test.cpp index e20014ca9a817d..dd10b78d6d4993 100644 --- a/be/test/exec/operator/table_function_operator_test.cpp +++ b/be/test/exec/operator/table_function_operator_test.cpp @@ -414,6 +414,7 @@ TEST_F(TableFunctionOperatorTest, block_fast_path_explode) { TEST_F(TableFunctionOperatorTest, block_fast_path_explode_batch_truncate) { state->_batch_size = 2; + state->_query_options.__set_batch_size(2); bool get_value_called = false; auto int_type = std::make_shared(); auto arr_type = std::make_shared(int_type); @@ -657,6 +658,7 @@ TEST_F(TableFunctionOperatorTest, block_fast_path_explode_nullable_array_misalig TEST_F(TableFunctionOperatorTest, block_fast_path_explode_nullable_array_partial_gap_uses_slow_path) { state->_batch_size = 2; + state->_query_options.__set_batch_size(2); bool get_value_called = false; auto int_type = std::make_shared(); auto arr_type = std::make_shared(std::make_shared(int_type)); @@ -2162,4 +2164,49 @@ TEST_F(UnnestTest, posexplode_with_nulls_fast_path) { EXPECT_EQ(val_col->get_element(2), 30); } } + +// Verify table function operator applies a BlockBudget derived from the runtime state's +// row/byte limits when byte budget is set. This is a smoke test that the adaptive batch +// size config path doesn't crash. +TEST_F(TableFunctionOperatorTest, ByteBudgetLimitsOutputRows) { + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + state->_batch_size = 4096; + state->_query_options.__set_preferred_block_size_bytes(1); + state->_query_options.__set_batch_size(65535); + + bool get_value_called = false; + auto array_type = std::make_shared(std::make_shared()); + auto output_type = std::make_shared(); + prepare_fast_explode_operator(array_type, output_type, &get_value_called); + + // Push a single-row block with a small array. + { + auto int_col = ColumnHelper::create_column({1}); + auto inner_col = ColumnInt32::create(); + auto offsets = ColumnArray::ColumnOffsets::create(); + inner_col->insert_value(10); + inner_col->insert_value(20); + offsets->insert_value(2); + + auto array_col = ColumnArray::create(std::move(inner_col), std::move(offsets)); + Block block; + block.insert({int_col->get_ptr(), std::make_shared(), "col0"}); + block.insert({std::move(array_col), array_type, "col1"}); + push_child_block(std::move(block)); + } + + // Single pull — verify byte budget limits output. With max_bytes=1, the budget + // is exceeded after the first exploded row, so we get 1 row instead of 2. + { + Block block; + bool eos = false; + auto st = op->pull(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_GE(block.rows(), 1); + EXPECT_LE(block.rows(), 2); + } + + config::enable_adaptive_batch_size = saved; +} } // namespace doris diff --git a/be/test/exec/operator/union_operator_test.cpp b/be/test/exec/operator/union_operator_test.cpp index 3009e977a47d8f..8287cc5d9a6e2e 100644 --- a/be/test/exec/operator/union_operator_test.cpp +++ b/be/test/exec/operator/union_operator_test.cpp @@ -76,6 +76,8 @@ struct UnionOperatorTest : public ::testing::Test { TEST_F(UnionOperatorTest, test_all_const_expr) { state->_batch_size = 2; + state->_query_options.__set_batch_size(2); + state->_query_options.__set_batch_size(2); source_op.reset(new MockUnionSourceOperator { 0, {std::make_shared(), std::make_shared(), @@ -255,6 +257,8 @@ TEST_F(UnionOperatorTest, test_sink_and_source) { { for (int i = 0; i < child_size; i++) { sink_state[i]->_batch_size = 2; + sink_state[i]->_query_options.__set_batch_size(2); + sink_state[i]->_query_options.__set_batch_size(2); Block block = ColumnHelper::create_block({1, 2}, {3, 4}); EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false)); } @@ -291,4 +295,56 @@ TEST_F(UnionOperatorTest, test_sink_and_source) { block, ColumnHelper::create_block({1, 2}, {3, 4}))); } } -} // namespace doris \ No newline at end of file + +// Verify union source const-expr path respects byte budget via within_budget. +// With a tiny byte budget, each get_block should output fewer const rows. +TEST_F(UnionOperatorTest, ByteBudgetLimitsConstExprOutput) { + auto saved = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + state->_batch_size = 2; + state->_query_options.__set_batch_size(2); + state->_query_options.__set_preferred_block_size_bytes(1); + state->_query_options.__set_batch_size(2); + + source_op.reset(new MockUnionSourceOperator { + 0, // child_size=0 means const-only + {std::make_shared(), std::make_shared()}, + &pool}); + EXPECT_TRUE(source_op->prepare(state.get())); + source_op->_const_expr_lists.push_back(MockLiteral::create({1, 10})); + source_op->_const_expr_lists.push_back(MockLiteral::create({2, 20})); + source_op->_const_expr_lists.push_back(MockLiteral::create({3, 30})); + + auto shared_state_sptr = std::make_shared(0); + { + auto source_local_state_uptr = + std::make_unique(state.get(), source_op.get()); + source_local_state = source_local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = shared_state_sptr.get(), + .shared_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(source_local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(source_op->operator_id(), std::move(source_local_state_uptr)); + EXPECT_TRUE(source_local_state->open(state.get())); + } + + // Pull all const expr output. With tiny byte budget and batch_size=2, + // each get_block returns at most 2 const rows, and once bytes exceed 1, it stops. + size_t total_rows = 0; + int calls = 0; + bool eos = false; + while (!eos && calls < 100) { + Block block; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok()); + total_rows += block.rows(); + calls++; + } + EXPECT_EQ(total_rows, 3); + EXPECT_TRUE(eos); + + config::enable_adaptive_batch_size = saved; +} +} // namespace doris diff --git a/be/test/exec/sort/full_sort_test.cpp b/be/test/exec/sort/full_sort_test.cpp index e182048c807dad..0aab266aed769b 100644 --- a/be/test/exec/sort/full_sort_test.cpp +++ b/be/test/exec/sort/full_sort_test.cpp @@ -26,6 +26,7 @@ #include #include +#include "common/config.h" #include "common/object_pool.h" #include "core/assert_cast.h" #include "core/block/block.h" @@ -113,4 +114,85 @@ TEST_F(FullSorterTest, test_full_sorter3) { EXPECT_EQ(sorter->_state->get_sorted_block()[1]->rows(), 4); } +TEST_F(FullSorterTest, test_get_next_block_max_bytes) { + // Test that FullSorter::get_next respects block_max_bytes via RuntimeState. + // Must create multiple sorted blocks (via explicit do_sort) so the merge tree + // has multiple cursors and the bytes check fires between iterations. + auto saved_adaptive = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + _state._query_options.__set_batch_size(1000); + _state._query_options.__set_preferred_block_size_bytes(50); + + sorter = FullSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, nulls_first, + *row_desc, &_state, nullptr); + sorter->init_profile(&_profile); + { + Block block = ColumnHelper::create_block({10, 9, 8, 7, 6, 5, 4, 3, 2, 1}); + EXPECT_TRUE(sorter->append_block(&block).ok()); + EXPECT_TRUE(sorter->do_sort()); + } + { + Block block = ColumnHelper::create_block({15, 14, 13, 12, 11}); + EXPECT_TRUE(sorter->append_block(&block).ok()); + EXPECT_TRUE(sorter->do_sort()); + } + EXPECT_TRUE(sorter->prepare_for_read(false).ok()); + + size_t total_rows = 0; + int num_blocks = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(sorter->get_next(&_state, &block, &eos).ok()); + if (block.rows() > 0) { + total_rows += block.rows(); + num_blocks++; + } + } + EXPECT_EQ(total_rows, 15); + EXPECT_GT(num_blocks, 1) << "block_max_bytes should cause multiple output blocks"; + + config::enable_adaptive_batch_size = saved_adaptive; +} + +TEST_F(FullSorterTest, test_merge_sort_read_for_spill_block_max_bytes) { + // Test that merge_sort_read_for_spill passes block_max_bytes from RuntimeState. + auto saved_adaptive = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + _state._query_options.__set_batch_size(1000); + _state._query_options.__set_preferred_block_size_bytes(50); + + sorter = FullSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, nulls_first, + *row_desc, &_state, nullptr); + sorter->init_profile(&_profile); + { + Block block = ColumnHelper::create_block({10, 9, 8, 7, 6, 5, 4, 3, 2, 1}); + EXPECT_TRUE(sorter->append_block(&block).ok()); + EXPECT_TRUE(sorter->do_sort()); + } + { + Block block = ColumnHelper::create_block({15, 14, 13, 12, 11}); + EXPECT_TRUE(sorter->append_block(&block).ok()); + EXPECT_TRUE(sorter->do_sort()); + } + EXPECT_TRUE(sorter->prepare_for_read(false).ok()); + + size_t total_rows = 0; + int num_blocks = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(sorter->merge_sort_read_for_spill(&_state, &block, 1000, &eos).ok()); + if (block.rows() > 0) { + total_rows += block.rows(); + num_blocks++; + } + } + EXPECT_EQ(total_rows, 15); + EXPECT_GT(num_blocks, 1) + << "merge_sort_read_for_spill should respect block_max_bytes from state"; + + config::enable_adaptive_batch_size = saved_adaptive; +} + } // namespace doris \ No newline at end of file diff --git a/be/test/exec/sort/merge_sorter_state.cpp b/be/test/exec/sort/merge_sorter_state.cpp index 0dc8a1a8937164..753b7b7184a781 100644 --- a/be/test/exec/sort/merge_sorter_state.cpp +++ b/be/test/exec/sort/merge_sorter_state.cpp @@ -77,7 +77,7 @@ TEST_F(MergeSorterStateTest, test1) { { Block block; bool eos = false; - Status status = state->merge_sort_read(&block, 2, &eos); + Status status = state->merge_sort_read(&block, 2, /*block_max_bytes=*/0, &eos); EXPECT_TRUE(status.ok()); EXPECT_TRUE(ColumnHelper::block_equal(block, ColumnHelper::create_block({1, 2}))); @@ -86,7 +86,7 @@ TEST_F(MergeSorterStateTest, test1) { { Block block; bool eos = false; - Status status = state->merge_sort_read(&block, 2, &eos); + Status status = state->merge_sort_read(&block, 2, /*block_max_bytes=*/0, &eos); EXPECT_TRUE(status.ok()); EXPECT_TRUE(ColumnHelper::block_equal(block, ColumnHelper::create_block({3, 4}))); @@ -95,10 +95,67 @@ TEST_F(MergeSorterStateTest, test1) { { Block block; bool eos = false; - Status status = state->merge_sort_read(&block, 2, &eos); + Status status = state->merge_sort_read(&block, 2, /*block_max_bytes=*/0, &eos); EXPECT_TRUE(status.ok()); EXPECT_TRUE(ColumnHelper::block_equal(block, ColumnHelper::create_block({5, 6}))); } } -} // namespace doris \ No newline at end of file + +TEST_F(MergeSorterStateTest, BLOCK_MAX_BYTES_LIMITS_OUTPUT) { + // With block_max_bytes set small, merge_sort_read should produce smaller blocks. + // Use multiple sorted blocks so the merge loop iterates multiple times. + // Each sorted block: 10 Int64 rows (80 bytes). 10 blocks = 100 rows total. + // block_max_bytes=50 triggers the break after consuming the first cursor batch. + state.reset(new MergeSorterState(*row_desc, 0)); + for (int b = 0; b < 10; b++) { + std::vector values; + for (int i = 0; i < 10; i++) { + values.push_back(b * 10 + i); + } + state->add_sorted_block(create_block(values)); + } + EXPECT_EQ(state->num_rows(), 100); + + SortDescription desc {SortColumnDescription {0, 1, -1}}; + EXPECT_TRUE(state->build_merge_tree(desc)); + + size_t total_rows = 0; + int num_blocks = 0; + bool eos = false; + while (!eos) { + Block block; + Status status = state->merge_sort_read(&block, 100, /*block_max_bytes=*/50, &eos); + EXPECT_TRUE(status.ok()); + if (block.rows() > 0) { + total_rows += block.rows(); + num_blocks++; + } + } + EXPECT_EQ(total_rows, 100); + EXPECT_GT(num_blocks, 1) << "Should produce multiple blocks due to bytes limit"; +} + +TEST_F(MergeSorterStateTest, BLOCK_MAX_BYTES_ZERO_DISABLES_CHECK) { + // block_max_bytes=0 should not limit — all rows in one call (but eos follows). + state.reset(new MergeSorterState(*row_desc, 0)); + state->add_sorted_block(create_block({1, 2, 3, 4, 5})); + state->add_sorted_block(create_block({6, 7, 8, 9, 10})); + + SortDescription desc {SortColumnDescription {0, 1, -1}}; + EXPECT_TRUE(state->build_merge_tree(desc)); + + Block block; + bool eos = false; + Status status = state->merge_sort_read(&block, 100, /*block_max_bytes=*/0, &eos); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(block.rows(), 10) << "block_max_bytes=0 should not limit rows"; + // eos is set to (merged_rows == 0), so after reading 10 rows, eos=false + // Next call will return 0 rows with eos=true + EXPECT_FALSE(eos); + Block block2; + status = state->merge_sort_read(&block2, 100, /*block_max_bytes=*/0, &eos); + EXPECT_TRUE(status.ok()); + EXPECT_TRUE(eos); +} +} // namespace doris diff --git a/be/test/exec/sort/partition_sorter_test.cpp b/be/test/exec/sort/partition_sorter_test.cpp index 8bf863e840c873..8872f859c0d2cc 100644 --- a/be/test/exec/sort/partition_sorter_test.cpp +++ b/be/test/exec/sort/partition_sorter_test.cpp @@ -28,6 +28,7 @@ #include #include +#include "common/config.h" #include "common/object_pool.h" #include "core/assert_cast.h" #include "core/block/block.h" @@ -187,4 +188,98 @@ TEST_F(PartitionSorterTest, test_partition_sorter_RANK) { sorter->reset_sorter_state(&_state); } +TEST_F(PartitionSorterTest, test_partition_sorter_ROW_NUMBER_block_max_bytes) { + // Test that block_max_bytes limits output block size for ROW_NUMBER. + // With 16 rows of Int64 (8 bytes each), total ~128 bytes of column data. + // Set block_max_bytes small so the output is split into multiple blocks. + auto saved_adaptive = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + _state._query_options.__set_batch_size(1000); + _state._query_options.__set_preferred_block_size_bytes(50); + + sorter = PartitionSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, + nulls_first, *row_desc, &_state, nullptr, false, 20, + TopNAlgorithm::ROW_NUMBER, nullptr); + sorter->init_profile(&_profile); + { + Block block = ColumnHelper::create_block({10, 9, 8, 7, 6, 5, 4, 3, 2, 1}); + EXPECT_TRUE(sorter->append_block(&block).ok()); + } + { + Block block = ColumnHelper::create_block({4, 5, 6, 7}); + EXPECT_TRUE(sorter->append_block(&block).ok()); + } + { + Block block = ColumnHelper::create_block({100, 111}); + EXPECT_TRUE(sorter->append_block(&block).ok()); + } + { + auto st = sorter->prepare_for_read(false); + EXPECT_TRUE(st.ok()) << st.msg(); + } + + size_t total_rows = 0; + int num_blocks = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(sorter->get_next(&_state, &block, &eos).ok()); + if (block.rows() > 0) { + total_rows += block.rows(); + num_blocks++; + } + } + // partition_inner_limit=20, but only 16 rows total + EXPECT_EQ(total_rows, 16); + EXPECT_GT(num_blocks, 1) << "block_max_bytes should cause multiple output blocks"; + + config::enable_adaptive_batch_size = saved_adaptive; +} + +TEST_F(PartitionSorterTest, test_partition_sorter_RANK_block_max_bytes) { + // Test that block_max_bytes limits output for RANK algorithm. + // The RANK path checks bytes every 256 rows, so we need > 256 rows to trigger. + auto saved_adaptive = config::enable_adaptive_batch_size; + config::enable_adaptive_batch_size = true; + _state._query_options.__set_batch_size(2000); + _state._query_options.__set_preferred_block_size_bytes(50); + + SortCursorCmp previous_row; + sorter = PartitionSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, + nulls_first, *row_desc, &_state, nullptr, false, 1000, + TopNAlgorithm::RANK, &previous_row); + sorter->init_profile(&_profile); + + // Add 600 distinct rows so bytes check at row 256 and 512 can trigger + { + std::vector values; + for (int i = 0; i < 600; i++) { + values.push_back(i); + } + Block block = ColumnHelper::create_block(values); + EXPECT_TRUE(sorter->append_block(&block).ok()); + } + { + auto st = sorter->prepare_for_read(false); + EXPECT_TRUE(st.ok()) << st.msg(); + } + + size_t total_rows = 0; + int num_blocks = 0; + bool eos = false; + while (!eos) { + Block block; + EXPECT_TRUE(sorter->get_next(&_state, &block, &eos).ok()); + if (block.rows() > 0) { + total_rows += block.rows(); + num_blocks++; + } + } + EXPECT_EQ(total_rows, 600); + EXPECT_GT(num_blocks, 1) << "block_max_bytes should cause multiple output blocks in RANK path"; + + config::enable_adaptive_batch_size = saved_adaptive; + sorter->reset_sorter_state(&_state); +} + } // namespace doris \ No newline at end of file