From 75171cb7c77f903c96542ea473e4083fe0ecd070 Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:26:43 +0800 Subject: [PATCH] [feature](runtime-filter) Add adaptive global runtime filter tree publish Issue Number: N/A Related PR: N/A Problem Summary: Global runtime filters are currently published from the merge coordinator directly to every scan target. When a merged filter is large and the cluster has many BEs, the coordinator sends many large duplicate RPC attachments. This change adds adaptive tree publish for global runtime filters so each relay node forwards a bounded number of copies based on runtime_filter_tree_publish_max_send_bytes. Add runtime_filter_tree_publish_max_send_bytes session variable to control adaptive global runtime filter tree publish. Set it to 0 to disable tree publish. - Test: - Manual test: git diff --cached --check - Manual test: build-support/check-format.sh - Manual test: DORIS_HOME=/mnt/disk8/xiaolei/codespaces/doris2 DORIS_THIRDPARTY=/mnt/disk8/xiaolei/codespaces/doris2/thirdparty ninja -C be/build_Release src/exec/CMakeFiles/Exec.dir/runtime_filter/runtime_filter_mgr.cpp.o src/service/CMakeFiles/Service.dir/internal_service.cpp.o - Manual test: ./build.sh --fe - Behavior changed: Yes. Large global runtime filters can be published through an adaptive relay tree instead of direct coordinator fanout. - Does this need documentation: No --- .../runtime_filter/runtime_filter_mgr.cpp | 278 ++++++++++++++++-- .../exec/runtime_filter/runtime_filter_mgr.h | 31 +- be/src/service/internal_service.cpp | 23 +- .../runtime_filter_mgr_test.cpp | 110 +++++++ .../org/apache/doris/qe/SessionVariable.java | 26 ++ gensrc/proto/internal_service.proto | 9 +- gensrc/thrift/PaloInternalService.thrift | 1 + 7 files changed, 451 insertions(+), 27 deletions(-) diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index 9de74e5acb31cf..29912f5c441857 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -25,6 +25,9 @@ #include #include +#include +#include +#include #include #include #include @@ -45,9 +48,195 @@ #include "runtime/thread_context.h" #include "util/brpc_client_cache.h" #include "util/brpc_closure.h" +#include "util/uid_util.h" namespace doris { +namespace { + +std::vector build_runtime_filter_publish_targets( + const std::vector& targets) { + std::vector publish_targets; + publish_targets.reserve(targets.size()); + for (const auto& target : targets) { + DORIS_CHECK(target.__isset.target_fragment_ids); + DORIS_CHECK(!target.target_fragment_ids.empty()); + RuntimeFilterPublishTarget publish_target; + publish_target.addr.set_hostname(target.target_fragment_instance_addr.hostname); + publish_target.addr.set_port(target.target_fragment_instance_addr.port); + publish_target.fragment_ids = target.target_fragment_ids; + publish_targets.emplace_back(std::move(publish_target)); + } + return publish_targets; +} + +class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure { +public: + RuntimeFilterRelayRpcClosure(std::shared_ptr request, + std::weak_ptr query_ctx) + : _request(std::move(request)), + _callback(HandleErrorBrpcCallback::create_shared( + std::move(query_ctx))) {} + + void Run() override { + std::unique_ptr self(this); + _callback->call(); + } + + brpc::Controller* cntl() { return _callback->cntl_.get(); } + PPublishFilterRequestV2* request() { return _request.get(); } + PPublishFilterResponse* response() { return _callback->response_.get(); } + +private: + std::shared_ptr _request; + std::shared_ptr> _callback; +}; + +Status send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task, + const butil::IOBuf& request_attachment, int timeout_ms, + std::weak_ptr query_ctx) { + std::shared_ptr stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(task.receiver.addr)); + if (stub == nullptr) { + LOG(WARNING) << "Failed to init runtime filter relay rpc to " + << task.receiver.addr.hostname() << ":" << task.receiver.addr.port(); + return Status::InternalError("Failed to init runtime filter relay rpc to {}:{}", + task.receiver.addr.hostname(), task.receiver.addr.port()); + } + + // brpc calls Run() exactly once; RuntimeFilterRelayRpcClosure deletes itself there. + auto* closure = new RuntimeFilterRelayRpcClosure( + std::make_shared(task.request), std::move(query_ctx)); + if (!request_attachment.empty()) { + closure->cntl()->request_attachment().append(request_attachment); + } + closure->cntl()->set_timeout_ms(timeout_ms); + if (config::execution_ignore_eovercrowded) { + closure->cntl()->ignore_eovercrowded(); + } + stub->apply_filterv2(closure->cntl(), closure->request(), closure->response(), closure); + return Status::OK(); +} + +void set_request_direct_publish_target(const TRuntimeFilterTargetParamsV2& target, + PPublishFilterRequestV2* request) { + DORIS_CHECK(target.__isset.target_fragment_ids); + DORIS_CHECK(!target.target_fragment_ids.empty()); + for (const auto& target_fragment_id : target.target_fragment_ids) { + request->add_fragment_ids(target_fragment_id); + } +} + +} // namespace + +std::vector> split_runtime_filter_publish_targets( + const std::vector& targets, int fanout) { + DORIS_CHECK(!targets.empty()); + DORIS_CHECK(fanout > 0); + size_t slice_count = std::min(targets.size(), static_cast(fanout)); + std::vector> slices; + slices.reserve(slice_count); + for (size_t offset = 0; offset < targets.size();) { + size_t remaining_targets = targets.size() - offset; + size_t remaining_slices = slice_count - slices.size(); + size_t slice_size = (remaining_targets + remaining_slices - 1) / remaining_slices; + slices.emplace_back(targets.begin() + offset, targets.begin() + offset + slice_size); + offset += slice_size; + } + return slices; +} + +std::vector build_runtime_filter_publish_tasks( + const PPublishFilterRequestV2& base_request, + const std::vector& targets, int fanout) { + std::vector tasks; + auto slices = split_runtime_filter_publish_targets(targets, fanout); + PPublishFilterRequestV2 request_template = base_request; + request_template.clear_fragment_ids(); + request_template.clear_fragment_instance_ids(); + request_template.clear_forward_targets(); + tasks.reserve(slices.size()); + for (const auto& slice : slices) { + DORIS_CHECK(!slice.empty()); + RuntimeFilterPublishTask task; + task.receiver = slice.front(); + task.request = request_template; + + for (int32_t fragment_id : task.receiver.fragment_ids) { + task.request.add_fragment_ids(fragment_id); + } + for (size_t i = 1; i < slice.size(); ++i) { + DORIS_CHECK(!slice[i].fragment_ids.empty()); + PPublishFilterForwardTarget* forward_target = task.request.add_forward_targets(); + forward_target->mutable_target_addr()->CopyFrom(slice[i].addr); + for (int32_t fragment_id : slice[i].fragment_ids) { + forward_target->add_fragment_ids(fragment_id); + } + } + tasks.emplace_back(std::move(task)); + } + return tasks; +} + +int calculate_tree_publish_fanout(int64_t serialized_filter_size, size_t target_count, + int64_t max_send_bytes) { + DORIS_CHECK(serialized_filter_size > 0); + DORIS_CHECK(max_send_bytes >= 0); + if (max_send_bytes == 0 || target_count <= 1) { + return 0; + } + + const int64_t direct_target_limit = max_send_bytes / serialized_filter_size; + if (target_count <= static_cast(direct_target_limit)) { + return 0; + } + + return static_cast(std::max(1, direct_target_limit)); +} + +Status forward_runtime_filter(const PPublishFilterRequestV2& request, + const butil::IOBuf& request_attachment, + std::weak_ptr query_ctx) { + if (request.forward_targets().empty()) { + return Status::OK(); + } + + std::vector targets; + targets.reserve(request.forward_targets_size()); + for (const auto& forward_target : request.forward_targets()) { + DORIS_CHECK(forward_target.has_target_addr()); + DORIS_CHECK(forward_target.fragment_ids_size() > 0); + RuntimeFilterPublishTarget target; + target.addr.CopyFrom(forward_target.target_addr()); + target.fragment_ids.assign(forward_target.fragment_ids().begin(), + forward_target.fragment_ids().end()); + targets.emplace_back(std::move(target)); + } + + DORIS_CHECK(request.has_tree_publish_fanout()); + int fanout = request.tree_publish_fanout(); + DORIS_CHECK(fanout > 0); + DORIS_CHECK(request.has_publish_rpc_timeout_ms()); + int timeout_ms = request.publish_rpc_timeout_ms(); + auto tasks = build_runtime_filter_publish_tasks(request, targets, fanout); + VLOG_NOTICE << "Runtime filter relay publish filter_id=" << request.filter_id() + << ", forward_targets=" << targets.size() << ", child_rpc_count=" << tasks.size() + << ", fanout=" << fanout; + auto st = Status::OK(); + for (const auto& task : tasks) { + Status rpc_st = + send_runtime_filter_relay_rpc(task, request_attachment, timeout_ms, query_ctx); + if (!rpc_st.ok()) { + LOG(WARNING) << "Failed to forward runtime filter, query_id=" + << print_id(request.query_id()) << ", filter_id=" << request.filter_id() + << ", target=" << task.receiver.addr.hostname() << ":" + << task.receiver.addr.port() << ", status=" << rpc_st; + st = std::move(rpc_st); + } + } + return st; +} + RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global) : _is_global(is_global), _tracker(std::make_unique( @@ -410,16 +599,15 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q query_ctx->ignore_runtime_filter_error() ? std::weak_ptr {} : query_ctx, - merge_time, request->query_id(), query_ctx->execution_timeout()); + merge_time, request->query_id(), query_ctx->execution_timeout(), + query_ctx->query_options()); } return Status::OK(); } -Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext& cnt_val, - std::weak_ptr ctx, - int64_t merge_time, - PUniqueId query_id, - int execution_timeout) { +Status RuntimeFilterMergeControllerEntity::_send_rf_to_target( + GlobalMergeContext& cnt_val, std::weak_ptr ctx, int64_t merge_time, + PUniqueId query_id, int execution_timeout, const TQueryOptions& query_options) { if (cnt_val.targetv2_info.empty()) { return Status::InternalError( "_send_rf_to_target called with empty targetv2_info, filter: {}", @@ -454,6 +642,66 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext } std::vector& targets = cnt_val.targetv2_info; + int timeout_ms = get_execution_rpc_timeout_ms(execution_timeout); + apply_request.set_merge_time(merge_time); + *apply_request.mutable_query_id() = query_id; + const int64_t serialized_filter_size = + static_cast(apply_request.ByteSizeLong()) + std::max(0, len); + int fanout = 0; + if (query_options.__isset.runtime_filter_tree_publish_max_send_bytes) { + int64_t max_send_bytes = query_options.runtime_filter_tree_publish_max_send_bytes; + DORIS_CHECK(max_send_bytes >= 0); + fanout = calculate_tree_publish_fanout(serialized_filter_size, targets.size(), + max_send_bytes); + } + const bool use_tree_publish = fanout > 0 && targets.size() > static_cast(fanout); + + if (use_tree_publish) { + apply_request.set_tree_publish_fanout(fanout); + apply_request.set_publish_rpc_timeout_ms(timeout_ms); + auto publish_targets = build_runtime_filter_publish_targets(targets); + auto tasks = build_runtime_filter_publish_tasks(apply_request, publish_targets, fanout); + cnt_val.publish_callbacks.resize(tasks.size()); + VLOG_NOTICE << "Runtime filter tree publish filter_id=" << apply_request.filter_id() + << ", serialized_bytes=" << serialized_filter_size + << ", target_count=" << targets.size() << ", child_rpc_count=" << tasks.size() + << ", fanout=" << fanout; + auto st = Status::OK(); + for (size_t i = 0; i < tasks.size(); ++i) { + const auto& task = tasks[i]; + auto callback = HandleErrorBrpcCallback::create_shared(ctx); + cnt_val.publish_callbacks[i] = callback; + auto closure = AutoReleaseClosure>:: + create_unique(std::make_shared(task.request), + callback); + + if (has_attachment) { + closure->cntl_->request_attachment().append(request_attachment); + } + closure->cntl_->set_timeout_ms(timeout_ms); + if (config::execution_ignore_eovercrowded) { + closure->cntl_->ignore_eovercrowded(); + } + + std::shared_ptr stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + task.receiver.addr)); + if (stub == nullptr) { + LOG(WARNING) << "Failed to init rpc to " << task.receiver.addr.hostname() << ":" + << task.receiver.addr.port(); + st = Status::InternalError("Failed to init rpc to {}:{}", + task.receiver.addr.hostname(), + task.receiver.addr.port()); + continue; + } + stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + } + return st; + } + auto st = Status::OK(); cnt_val.publish_callbacks.resize(targets.size()); for (size_t i = 0; i < targets.size(); ++i) { @@ -464,30 +712,16 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext HandleErrorBrpcCallback>:: create_unique(std::make_shared(apply_request), callback); - closure->request_->set_merge_time(merge_time); - *closure->request_->mutable_query_id() = query_id; if (has_attachment) { closure->cntl_->request_attachment().append(request_attachment); } - closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(execution_timeout)); + closure->cntl_->set_timeout_ms(timeout_ms); if (config::execution_ignore_eovercrowded) { closure->cntl_->ignore_eovercrowded(); } - // set fragment-id - if (target.__isset.target_fragment_ids) { - for (auto& target_fragment_id : target.target_fragment_ids) { - closure->request_->add_fragment_ids(target_fragment_id); - } - } else { - // FE not upgraded yet. - for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) { - PUniqueId* cur_id = closure->request_->add_fragment_instance_ids(); - cur_id->set_hi(target_fragment_instance_id.hi); - cur_id->set_lo(target_fragment_instance_id.lo); - } - } + set_request_direct_publish_target(target, closure->request_.get()); std::shared_ptr stub( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index e5c6494917cea7..365e9cba1ec940 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -35,8 +36,9 @@ #include "util/uid_util.h" namespace butil { +class IOBuf; class IOBufAsZeroCopyInputStream; -} +} // namespace butil namespace doris { class PPublishFilterRequestV2; @@ -55,6 +57,30 @@ template class HandleErrorBrpcCallback; class SyncSizeCallback; +struct RuntimeFilterPublishTarget { + PNetworkAddress addr; + std::vector fragment_ids; +}; + +struct RuntimeFilterPublishTask { + RuntimeFilterPublishTarget receiver; + PPublishFilterRequestV2 request; +}; + +std::vector> split_runtime_filter_publish_targets( + const std::vector& targets, int fanout); + +std::vector build_runtime_filter_publish_tasks( + const PPublishFilterRequestV2& base_request, + const std::vector& targets, int fanout); + +int calculate_tree_publish_fanout(int64_t serialized_filter_size, size_t target_count, + int64_t max_send_bytes); + +Status forward_runtime_filter(const PPublishFilterRequestV2& request, + const butil::IOBuf& request_attachment, + std::weak_ptr query_ctx); + struct LocalMergeContext { std::shared_ptr merger; std::vector> producers; @@ -190,7 +216,8 @@ class RuntimeFilterMergeControllerEntity { const int producer_size); Status _send_rf_to_target(GlobalMergeContext& cnt_val, std::weak_ptr ctx, - int64_t merge_time, PUniqueId query_id, int execution_timeout); + int64_t merge_time, PUniqueId query_id, int execution_timeout, + const TQueryOptions& query_options); // protect _filter_map AnnotatedSharedMutex _filter_map_mutex; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 057181ec80e275..ac94271cb303ec 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -66,6 +66,7 @@ #include "exec/common/variant_util.h" #include "exec/exchange/vdata_stream_mgr.h" #include "exec/rowid_fetcher.h" +#include "exec/runtime_filter/runtime_filter_mgr.h" #include "exec/sink/writer/varrow_flight_result_writer.h" #include "exec/sink/writer/vmysql_result_writer.h" #include "exprs/function/dictionary_factory.h" @@ -97,6 +98,7 @@ #include "runtime/exec_env.h" #include "runtime/fold_constant_executor.h" #include "runtime/fragment_mgr.h" +#include "runtime/query_context.h" #include "runtime/result_block_buffer.h" #include "runtime/result_buffer_mgr.h" #include "runtime/runtime_profile.h" @@ -1570,8 +1572,11 @@ void PInternalService::apply_filterv2(::google::protobuf::RpcController* control bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { signal::SignalTaskIdKeeper keeper(request->query_id()); brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + const butil::IOBuf& request_attachment = + static_cast(controller)->request_attachment(); + butil::IOBuf apply_attachment = request_attachment; + butil::IOBuf forward_attachment = request_attachment; + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(apply_attachment); VLOG_NOTICE << "rpc apply_filterv2 recv"; Status st; try { @@ -1582,6 +1587,20 @@ void PInternalService::apply_filterv2(::google::protobuf::RpcController* control if (!st.ok()) { LOG(WARNING) << "apply filter meet error: " << st.to_string(); } + std::weak_ptr forward_ctx; + if (auto query_ctx = _exec_env->fragment_mgr()->get_query_ctx( + UniqueId(request->query_id()).to_thrift())) { + if (!query_ctx->ignore_runtime_filter_error()) { + forward_ctx = query_ctx; + } + } + Status forward_st = forward_runtime_filter(*request, forward_attachment, forward_ctx); + if (!forward_st.ok()) { + LOG(WARNING) << "forward runtime filter meet error: " << forward_st.to_string(); + if (st.ok()) { + st = std::move(forward_st); + } + } st.to_protobuf(response->mutable_status()); }); if (!ret) { diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp index 5a50e762c2527f..5f69f411cf1e31 100644 --- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp @@ -20,12 +20,37 @@ #include #include +#include + #include "exec/pipeline/thrift_builder.h" #include "exec/runtime_filter/runtime_filter_producer.h" #include "runtime/query_context.h" #include "testutil/mock/mock_runtime_state.h" namespace doris { +namespace { + +RuntimeFilterPublishTarget make_publish_target(int index) { + RuntimeFilterPublishTarget target; + target.addr.set_hostname("host" + std::to_string(index)); + target.addr.set_port(9000 + index); + target.fragment_ids.push_back(index); + return target; +} + +std::vector flatten_fragment_ids( + const std::vector>& slices) { + std::vector fragment_ids; + for (const auto& slice : slices) { + for (const auto& target : slice) { + fragment_ids.insert(fragment_ids.end(), target.fragment_ids.begin(), + target.fragment_ids.end()); + } + } + return fragment_ids; +} + +} // namespace class RuntimeFilterMgrTest : public testing::Test { public: @@ -186,4 +211,89 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMergeControllerEntity) { } } +TEST_F(RuntimeFilterMgrTest, SplitRuntimeFilterPublishTargets) { + std::vector targets; + for (int i = 0; i < 10; ++i) { + targets.push_back(make_publish_target(i)); + } + + auto slices = split_runtime_filter_publish_targets(targets, 3); + ASSERT_EQ(slices.size(), 3); + EXPECT_EQ(slices[0].size(), 4); + EXPECT_EQ(slices[1].size(), 3); + EXPECT_EQ(slices[2].size(), 3); + EXPECT_EQ(flatten_fragment_ids(slices), (std::vector {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); + + slices = split_runtime_filter_publish_targets(targets, 20); + ASSERT_EQ(slices.size(), targets.size()); + for (const auto& slice : slices) { + EXPECT_EQ(slice.size(), 1); + } + + slices = split_runtime_filter_publish_targets(targets, 1); + ASSERT_EQ(slices.size(), 1); + EXPECT_EQ(slices[0].size(), targets.size()); +} + +TEST_F(RuntimeFilterMgrTest, CalculateTreePublishFanout) { + constexpr int64_t MB = 1024L * 1024L; + constexpr int64_t max_send_bytes = 256L * MB; + constexpr size_t target_count = 48; + + EXPECT_EQ(calculate_tree_publish_fanout(1L * MB, target_count, max_send_bytes), 0); + EXPECT_EQ(calculate_tree_publish_fanout(4L * MB, target_count, max_send_bytes), 0); + EXPECT_EQ(calculate_tree_publish_fanout(8L * MB, target_count, max_send_bytes), 32); + EXPECT_EQ(calculate_tree_publish_fanout(16L * MB, target_count, max_send_bytes), 16); + EXPECT_EQ(calculate_tree_publish_fanout(32L * MB, target_count, max_send_bytes), 8); + EXPECT_EQ(calculate_tree_publish_fanout(64L * MB, target_count, max_send_bytes), 4); + EXPECT_EQ(calculate_tree_publish_fanout(128L * MB, target_count, max_send_bytes), 2); + + EXPECT_EQ(calculate_tree_publish_fanout(4L * MB, target_count, 0), 0); + EXPECT_EQ(calculate_tree_publish_fanout(256L * MB, target_count, max_send_bytes), 1); + EXPECT_EQ(calculate_tree_publish_fanout(1L * MB, 1024, max_send_bytes), 256); +} + +TEST_F(RuntimeFilterMgrTest, BuildRuntimeFilterPublishTasks) { + PPublishFilterRequestV2 base_request; + base_request.set_filter_id(10); + base_request.mutable_query_id()->set_hi(1); + base_request.mutable_query_id()->set_lo(2); + base_request.set_filter_type(PFilterType::BLOOM_FILTER); + base_request.set_tree_publish_fanout(2); + base_request.set_publish_rpc_timeout_ms(3000); + base_request.add_fragment_ids(999); + auto* stale_forward_target = base_request.add_forward_targets(); + stale_forward_target->mutable_target_addr()->set_hostname("stale"); + stale_forward_target->mutable_target_addr()->set_port(1); + stale_forward_target->add_fragment_ids(999); + + std::vector targets; + for (int i = 0; i < 5; ++i) { + targets.push_back(make_publish_target(i)); + } + + auto tasks = build_runtime_filter_publish_tasks(base_request, targets, 2); + ASSERT_EQ(tasks.size(), 2); + + EXPECT_EQ(tasks[0].receiver.addr.hostname(), "host0"); + EXPECT_EQ(tasks[0].request.fragment_ids_size(), 1); + EXPECT_EQ(tasks[0].request.fragment_ids(0), 0); + ASSERT_EQ(tasks[0].request.forward_targets_size(), 2); + EXPECT_EQ(tasks[0].request.forward_targets(0).target_addr().hostname(), "host1"); + EXPECT_EQ(tasks[0].request.forward_targets(0).fragment_ids(0), 1); + EXPECT_EQ(tasks[0].request.forward_targets(1).target_addr().hostname(), "host2"); + EXPECT_EQ(tasks[0].request.forward_targets(1).fragment_ids(0), 2); + + EXPECT_EQ(tasks[1].receiver.addr.hostname(), "host3"); + EXPECT_EQ(tasks[1].request.fragment_ids_size(), 1); + EXPECT_EQ(tasks[1].request.fragment_ids(0), 3); + ASSERT_EQ(tasks[1].request.forward_targets_size(), 1); + EXPECT_EQ(tasks[1].request.forward_targets(0).target_addr().hostname(), "host4"); + EXPECT_EQ(tasks[1].request.forward_targets(0).fragment_ids(0), 4); + + EXPECT_EQ(tasks[0].request.filter_id(), base_request.filter_id()); + EXPECT_EQ(tasks[0].request.tree_publish_fanout(), 2); + EXPECT_EQ(tasks[0].request.publish_rpc_timeout_ms(), 3000); +} + } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 845030a37c163a..faa24331feedd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -258,6 +258,8 @@ public class SessionVariable implements Serializable, Writable { "runtime_filter_broadcast_join_producer_num"; public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = "enable_sync_runtime_filter_size"; + public static final String RUNTIME_FILTER_TREE_PUBLISH_MAX_SEND_BYTES = + "runtime_filter_tree_publish_max_send_bytes"; public static final String ENABLE_PARALLEL_RESULT_SINK = "enable_parallel_result_sink"; @@ -1761,6 +1763,10 @@ public enum IgnoreSplitType { + "The legacy Coordinator path keeps the existing behavior."}) private int runtimeFilterBroadcastJoinProducerNum = 3; + @VarAttrDef.VarAttr(name = RUNTIME_FILTER_TREE_PUBLISH_MAX_SEND_BYTES, needForward = true, fuzzy = true, + checker = "checkRuntimeFilterTreePublishMaxSendBytes") + private long runtimeFilterTreePublishMaxSendBytes = 256L * 1024L * 1024L; + @VarAttrDef.VarAttr(name = "runtime_filter_max_build_row_count", needForward = true, fuzzy = false) public long runtimeFilterMaxBuildRowCount = 64L * 1024L * 1024L; @@ -3859,6 +3865,9 @@ public void initFuzzyModeVariables() { this.enableParallelScan = random.nextInt(2) == 0; this.enableRuntimeFilterPrune = (randomInt % 10) == 0; this.enableRuntimeFilterPartitionPrune = (randomInt % 2) == 0; + this.runtimeFilterTreePublishMaxSendBytes = + Util.getRandomLong(0, 64L * 1024L * 1024L, 128L * 1024L * 1024L, + 256L * 1024L * 1024L); switch (randomInt) { case 0: @@ -4788,6 +4797,10 @@ public void setRuntimeFilterBroadcastJoinProducerNum(int runtimeFilterBroadcastJ this.runtimeFilterBroadcastJoinProducerNum = runtimeFilterBroadcastJoinProducerNum; } + public long getRuntimeFilterTreePublishMaxSendBytes() { + return runtimeFilterTreePublishMaxSendBytes; + } + public void setEnableLocalShuffle(boolean enableLocalShuffle) { this.enableLocalShuffle = enableLocalShuffle; } @@ -5606,6 +5619,7 @@ public TQueryOptions toThrift() { tResult.setRuntimeBloomFilterMinSize(runtimeBloomFilterMinSize); tResult.setRuntimeBloomFilterMaxSize(runtimeBloomFilterMaxSize); tResult.setRuntimeFilterWaitInfinitely(runtimeFilterWaitInfinitely); + tResult.setRuntimeFilterTreePublishMaxSendBytes(runtimeFilterTreePublishMaxSendBytes); tResult.setEnableFuzzyBlockableTask(enableFuzzyBlockableTask); tResult.setEnableFunctionPushdown(enableFunctionPushdown); @@ -6543,6 +6557,18 @@ public void checkProfileLevel(String profileLevel) { } } + public void checkRuntimeFilterTreePublishMaxSendBytes(String maxSendBytes) { + long value = Long.valueOf(maxSendBytes); + if (value < 0) { + UnsupportedOperationException exception = + new UnsupportedOperationException( + "runtime_filter_tree_publish_max_send_bytes can not be set to " + + maxSendBytes + ", it must be greater or equal to 0"); + LOG.warn("Check runtime_filter_tree_publish_max_send_bytes failed", exception); + throw exception; + } + } + public void checkSqlConvertorFeatures(String features) { if (Strings.isNullOrEmpty(features)) { return; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index c5818339f3a66e..cd5fe59532c58d 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -696,8 +696,16 @@ message PPublishFilterRequestV2 { optional uint64 local_merge_time = 13; optional bool disabled = 14; optional uint32 stage = 15; + repeated PPublishFilterForwardTarget forward_targets = 16; + optional int32 tree_publish_fanout = 17; + optional int32 publish_rpc_timeout_ms = 18; }; +message PPublishFilterForwardTarget { + optional PNetworkAddress target_addr = 1; + repeated int32 fragment_ids = 2; +} + message PPublishFilterResponse { required PStatus status = 1; }; @@ -1267,4 +1275,3 @@ service PBackendService { rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); rpc request_cdc_client(PRequestCdcClientRequest) returns (PRequestCdcClientResult); }; - diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index cd2292ca6b63c3..f56096f711c9aa 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -504,6 +504,7 @@ struct TQueryOptions { // To control whether BE scan readers may apply expression-based ZoneMap pruning. 224: optional bool enable_expr_zonemap_filter = true + 225: optional i64 runtime_filter_tree_publish_max_send_bytes = 268435456 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.