Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 46 additions & 37 deletions be/src/exec/common/hash_table/join_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,25 @@ class JoinHashTable {
_keep_null_key = keep_null_key;
}

// batch_size_limit: when > 0, overrides max_batch_size for this call only (thread-safe).
template <int JoinOpType>
auto find_batch(const Key* __restrict keys, const uint32_t* __restrict build_idx_map,
int probe_idx, uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs, bool& probe_visited,
uint32_t* __restrict build_idxs, const uint8_t* null_map,
bool with_other_conjuncts, bool is_mark_join, bool has_mark_join_conjunct) {
bool with_other_conjuncts, bool is_mark_join, bool has_mark_join_conjunct,
int batch_size_limit) {
if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
_empty_build_side) {
return _process_null_aware_left_half_join_for_empty_build_side<JoinOpType>(
probe_idx, probe_rows, probe_idxs, build_idxs);
probe_idx, probe_rows, probe_idxs, build_idxs, batch_size_limit);
}

if (with_other_conjuncts) {
return _find_batch_conjunct<JoinOpType, false>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs);
return _find_batch_conjunct<JoinOpType, false>(keys, build_idx_map, probe_idx,
build_idx, probe_rows, probe_idxs,
build_idxs, batch_size_limit);
}

if (is_mark_join) {
Expand All @@ -130,35 +133,39 @@ class JoinHashTable {
if (is_null_aware_join || (is_left_half_join && !has_mark_join_conjunct)) {
return _find_batch_conjunct<JoinOpType, true>(keys, build_idx_map, probe_idx,
build_idx, probe_rows, probe_idxs,
build_idxs);
build_idxs, batch_size_limit);
}

return _find_batch_conjunct<JoinOpType, false>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs);
return _find_batch_conjunct<JoinOpType, false>(keys, build_idx_map, probe_idx,
build_idx, probe_rows, probe_idxs,
build_idxs, batch_size_limit);
}

if (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
return _find_batch_inner_outer_join<JoinOpType>(keys, build_idx_map, probe_idx,
build_idx, probe_rows, probe_idxs,
probe_visited, build_idxs);
return _find_batch_inner_outer_join<JoinOpType>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs,
probe_visited, build_idxs, batch_size_limit);
}
// ASOF JOIN: for each probe row, find one matching build row (the closest match)
// The actual closest match logic is handled in ProcessHashTableProbe
if (JoinOpType == TJoinOp::ASOF_LEFT_INNER_JOIN ||
JoinOpType == TJoinOp::ASOF_LEFT_OUTER_JOIN) {
// Use conjunct path to get all matching rows, then filter in ProcessHashTableProbe
return _find_batch_conjunct<JoinOpType, false>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs);
return _find_batch_conjunct<JoinOpType, false>(keys, build_idx_map, probe_idx,
build_idx, probe_rows, probe_idxs,
build_idxs, batch_size_limit);
}
if (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
if (null_map) {
return _find_batch_left_semi_anti<JoinOpType, true>(
keys, build_idx_map, probe_idx, probe_rows, probe_idxs, null_map);
return _find_batch_left_semi_anti<JoinOpType, true>(keys, build_idx_map, probe_idx,
probe_rows, probe_idxs,
null_map, batch_size_limit);
} else {
return _find_batch_left_semi_anti<JoinOpType, false>(
keys, build_idx_map, probe_idx, probe_rows, probe_idxs, nullptr);
return _find_batch_left_semi_anti<JoinOpType, false>(keys, build_idx_map, probe_idx,
probe_rows, probe_idxs,
nullptr, batch_size_limit);
}
}
if (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
Expand All @@ -178,27 +185,27 @@ class JoinHashTable {
* select 'a' not in ('b', null) => null => 'a' != 'b' and 'a' != null => true and null => null
* select 'a' not in ('a', 'b', null) => false
*/
auto find_null_aware_with_other_conjuncts(const Key* __restrict keys,
const uint32_t* __restrict build_idx_map,
int probe_idx, uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs,
uint8_t* __restrict null_flags,
bool picking_null_keys, const uint8_t* null_map) {
auto find_null_aware_with_other_conjuncts(
const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx,
uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags, bool picking_null_keys,
const uint8_t* null_map, int batch_size_limit) {
if (null_map) {
return _find_null_aware_with_other_conjuncts_impl<true>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs,
null_flags, picking_null_keys, null_map);
null_flags, picking_null_keys, null_map, batch_size_limit);
} else {
return _find_null_aware_with_other_conjuncts_impl<false>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs,
null_flags, picking_null_keys, nullptr);
null_flags, picking_null_keys, nullptr, batch_size_limit);
}
}

// batch_size_limit: when > 0, overrides max_batch_size for this call only.
template <int JoinOpType, bool is_mark_join>
bool iterate_map(ColumnOffset32& build_idxs, ColumnFilterHelper* mark_column_helper) const {
const auto batch_size = max_batch_size;
bool iterate_map(ColumnOffset32& build_idxs, ColumnFilterHelper* mark_column_helper,
int batch_size_limit = 0) const {
const auto batch_size = batch_size_limit > 0 ? batch_size_limit : max_batch_size;
const auto elem_num = visited.size();
int count = 0;
build_idxs.resize(batch_size);
Expand Down Expand Up @@ -244,15 +251,16 @@ class JoinHashTable {
template <int JoinOpType>
auto _process_null_aware_left_half_join_for_empty_build_side(int probe_idx, int probe_rows,
uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs) {
uint32_t* __restrict build_idxs,
int effective_batch_size) {
if (JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"process_null_aware_left_half_join_for_empty_build_side meet invalid "
"hash join input");
}
uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
const auto batch_size = effective_batch_size;

while (probe_idx < probe_rows && matched_cnt < batch_size) {
probe_idxs[matched_cnt] = probe_idx++;
Expand Down Expand Up @@ -293,9 +301,9 @@ class JoinHashTable {
auto _find_batch_left_semi_anti(const Key* __restrict keys,
const uint32_t* __restrict build_idx_map, int probe_idx,
int probe_rows, uint32_t* __restrict probe_idxs,
const uint8_t* null_map) {
const uint8_t* null_map, int effective_batch_size) {
uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
const auto batch_size = effective_batch_size;

while (probe_idx < probe_rows && matched_cnt < batch_size) {
if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && has_null_map) {
Expand All @@ -320,9 +328,10 @@ class JoinHashTable {
template <int JoinOpType, bool only_need_to_match_one>
auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* __restrict build_idx_map,
int probe_idx, uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) {
uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs,
int effective_batch_size) {
uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
const auto batch_size = effective_batch_size;

auto do_the_probe = [&]() {
while (build_idx && matched_cnt < batch_size) {
Expand Down Expand Up @@ -375,9 +384,9 @@ class JoinHashTable {
const uint32_t* __restrict build_idx_map, int probe_idx,
uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs, bool& probe_visited,
uint32_t* __restrict build_idxs) {
uint32_t* __restrict build_idxs, int effective_batch_size) {
uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
const auto batch_size = effective_batch_size;

auto do_the_probe = [&]() {
while (build_idx && matched_cnt < batch_size) {
Expand Down Expand Up @@ -429,9 +438,9 @@ class JoinHashTable {
const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx,
uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags, bool picking_null_keys,
const uint8_t* null_map) {
const uint8_t* null_map, int effective_batch_size) {
uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
const auto batch_size = effective_batch_size;

auto do_the_probe = [&]() {
/// If no any rows match the probe key, here start to handle null keys in build side.
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/exchange/local_exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

namespace doris {

LocalExchangeSourceLocalState::LocalExchangeSourceLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent) {}

Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/exchange/local_exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState<LocalExch
public:
using Base = PipelineXLocalState<LocalExchangeSharedState>;
ENABLE_FACTORY_CREATOR(LocalExchangeSourceLocalState);
LocalExchangeSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent) {}
LocalExchangeSourceLocalState(RuntimeState* state, OperatorXBase* parent);

Status init(RuntimeState* state, LocalStateInfo& info) override;
Status close(RuntimeState* state) override;
Expand Down
7 changes: 5 additions & 2 deletions be/src/exec/exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "exec/exchange/local_exchange_sink_operator.h"
#include "exec/exchange/local_exchange_source_operator.h"
#include "exec/partitioner/partitioner.h"

namespace doris {
template <typename BlockType>
void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
Expand Down Expand Up @@ -155,7 +154,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, Block* block, bool* eos,
auto block_wrapper = partitioned_block.first;
RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, offset_start,
offset_start + partitioned_block.second.length));
} while (mutable_block.rows() < state->batch_size() && !*eos &&
if (source_info.local_state->block_budget().exceeded(mutable_block.rows(),
mutable_block.bytes())) {
break;
}
} while (mutable_block.rows() < source_info.local_state->block_budget().max_rows && !*eos &&
_dequeue_data(source_info.local_state, partitioned_block, eos, block,
source_info.channel_id));
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/exchange/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,13 @@ VDataStreamRecvr::~VDataStreamRecvr() {
Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, size_t batch_size,
int64_t limit, size_t offset) {
int64_t limit, size_t offset, size_t block_max_bytes) {
DCHECK(_is_merging);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
std::vector<BlockSupplier> child_block_suppliers;
// Create the merger that will a single stream of sorted rows.
_merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit,
offset, _profile));
offset, _profile, block_max_bytes));

for (int i = 0; i < _sender_queues.size(); ++i) {
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exchange/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {
MOCK_FUNCTION Status create_merger(const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, size_t batch_size,
int64_t limit, size_t offset);
int64_t limit, size_t offset, size_t block_max_bytes);

std::vector<SenderQueue*> sender_queues() const { return _sender_queues; }

Expand Down
12 changes: 10 additions & 2 deletions be/src/exec/exchange/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ Status Channel::close(RuntimeState* state) {
}

BlockSerializer::BlockSerializer(ExchangeSinkLocalState* parent, bool is_local)
: _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {}
: _parent(parent),
_is_local(is_local),
_budget(parent->state()->batch_size(), parent->state()->preferred_block_size_bytes()) {}

Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers,
bool* serialized, bool eos, const uint32_t* data,
Expand All @@ -309,7 +311,13 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t
}
}

if (_mutable_block->rows() >= _batch_size || eos ||
// Two thresholds intentionally coexist:
// - _budget caps the *target* output block size (rows + bytes), shaping the
// average serialized batch sent downstream.
// - _buffer_mem_limit is the dynamic back-pressure cap propagated from
// Channel::set_buffer_mem_limit(); when the in-flight buffer grows past it
// we must flush regardless of the budget, to keep memory bounded.
if (_budget.exceeded(_mutable_block->rows(), _mutable_block->bytes()) || eos ||
(_mutable_block->rows() > 0 && _mutable_block->allocated_bytes() > _buffer_mem_limit)) {
if (!_is_local) {
RETURN_IF_ERROR(_serialize_block(dest, num_receivers));
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/exchange/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "runtime/runtime_profile.h"
#include "service/backend_options.h"
#include "storage/tablet_info.h"
#include "util/block_budget.h"
#include "util/brpc_closure.h"
#include "util/uid_util.h"

Expand All @@ -72,7 +73,7 @@ class BlockSerializer {
public:
BlockSerializer(ExchangeSinkLocalState* parent, bool is_local = true);
#ifdef BE_TEST
BlockSerializer() : _batch_size(0) {};
BlockSerializer() : _parent(nullptr), _is_local(true), _budget(0, 0) {};
#endif
Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized,
bool eos, const uint32_t* data = nullptr,
Expand All @@ -97,7 +98,7 @@ class BlockSerializer {
std::unique_ptr<MutableBlock> _mutable_block;

bool _is_local;
const int _batch_size;
const BlockBudget _budget;
std::atomic<size_t> _buffer_mem_limit = UINT64_MAX;
};

Expand Down
Loading
Loading