diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index 9de74e5acb31cf..29912f5c441857 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -25,6 +25,9 @@ #include #include +#include +#include +#include #include #include #include @@ -45,9 +48,195 @@ #include "runtime/thread_context.h" #include "util/brpc_client_cache.h" #include "util/brpc_closure.h" +#include "util/uid_util.h" namespace doris { +namespace { + +std::vector build_runtime_filter_publish_targets( + const std::vector& targets) { + std::vector publish_targets; + publish_targets.reserve(targets.size()); + for (const auto& target : targets) { + DORIS_CHECK(target.__isset.target_fragment_ids); + DORIS_CHECK(!target.target_fragment_ids.empty()); + RuntimeFilterPublishTarget publish_target; + publish_target.addr.set_hostname(target.target_fragment_instance_addr.hostname); + publish_target.addr.set_port(target.target_fragment_instance_addr.port); + publish_target.fragment_ids = target.target_fragment_ids; + publish_targets.emplace_back(std::move(publish_target)); + } + return publish_targets; +} + +class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure { +public: + RuntimeFilterRelayRpcClosure(std::shared_ptr request, + std::weak_ptr query_ctx) + : _request(std::move(request)), + _callback(HandleErrorBrpcCallback::create_shared( + std::move(query_ctx))) {} + + void Run() override { + std::unique_ptr self(this); + _callback->call(); + } + + brpc::Controller* cntl() { return _callback->cntl_.get(); } + PPublishFilterRequestV2* request() { return _request.get(); } + PPublishFilterResponse* response() { return _callback->response_.get(); } + +private: + std::shared_ptr _request; + std::shared_ptr> _callback; +}; + +Status send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task, + const butil::IOBuf& request_attachment, int timeout_ms, + std::weak_ptr query_ctx) { + std::shared_ptr stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(task.receiver.addr)); + if (stub == nullptr) { + LOG(WARNING) << "Failed to init runtime filter relay rpc to " + << task.receiver.addr.hostname() << ":" << task.receiver.addr.port(); + return Status::InternalError("Failed to init runtime filter relay rpc to {}:{}", + task.receiver.addr.hostname(), task.receiver.addr.port()); + } + + // brpc calls Run() exactly once; RuntimeFilterRelayRpcClosure deletes itself there. + auto* closure = new RuntimeFilterRelayRpcClosure( + std::make_shared(task.request), std::move(query_ctx)); + if (!request_attachment.empty()) { + closure->cntl()->request_attachment().append(request_attachment); + } + closure->cntl()->set_timeout_ms(timeout_ms); + if (config::execution_ignore_eovercrowded) { + closure->cntl()->ignore_eovercrowded(); + } + stub->apply_filterv2(closure->cntl(), closure->request(), closure->response(), closure); + return Status::OK(); +} + +void set_request_direct_publish_target(const TRuntimeFilterTargetParamsV2& target, + PPublishFilterRequestV2* request) { + DORIS_CHECK(target.__isset.target_fragment_ids); + DORIS_CHECK(!target.target_fragment_ids.empty()); + for (const auto& target_fragment_id : target.target_fragment_ids) { + request->add_fragment_ids(target_fragment_id); + } +} + +} // namespace + +std::vector> split_runtime_filter_publish_targets( + const std::vector& targets, int fanout) { + DORIS_CHECK(!targets.empty()); + DORIS_CHECK(fanout > 0); + size_t slice_count = std::min(targets.size(), static_cast(fanout)); + std::vector> slices; + slices.reserve(slice_count); + for (size_t offset = 0; offset < targets.size();) { + size_t remaining_targets = targets.size() - offset; + size_t remaining_slices = slice_count - slices.size(); + size_t slice_size = (remaining_targets + remaining_slices - 1) / remaining_slices; + slices.emplace_back(targets.begin() + offset, targets.begin() + offset + slice_size); + offset += slice_size; + } + return slices; +} + +std::vector build_runtime_filter_publish_tasks( + const PPublishFilterRequestV2& base_request, + const std::vector& targets, int fanout) { + std::vector tasks; + auto slices = split_runtime_filter_publish_targets(targets, fanout); + PPublishFilterRequestV2 request_template = base_request; + request_template.clear_fragment_ids(); + request_template.clear_fragment_instance_ids(); + request_template.clear_forward_targets(); + tasks.reserve(slices.size()); + for (const auto& slice : slices) { + DORIS_CHECK(!slice.empty()); + RuntimeFilterPublishTask task; + task.receiver = slice.front(); + task.request = request_template; + + for (int32_t fragment_id : task.receiver.fragment_ids) { + task.request.add_fragment_ids(fragment_id); + } + for (size_t i = 1; i < slice.size(); ++i) { + DORIS_CHECK(!slice[i].fragment_ids.empty()); + PPublishFilterForwardTarget* forward_target = task.request.add_forward_targets(); + forward_target->mutable_target_addr()->CopyFrom(slice[i].addr); + for (int32_t fragment_id : slice[i].fragment_ids) { + forward_target->add_fragment_ids(fragment_id); + } + } + tasks.emplace_back(std::move(task)); + } + return tasks; +} + +int calculate_tree_publish_fanout(int64_t serialized_filter_size, size_t target_count, + int64_t max_send_bytes) { + DORIS_CHECK(serialized_filter_size > 0); + DORIS_CHECK(max_send_bytes >= 0); + if (max_send_bytes == 0 || target_count <= 1) { + return 0; + } + + const int64_t direct_target_limit = max_send_bytes / serialized_filter_size; + if (target_count <= static_cast(direct_target_limit)) { + return 0; + } + + return static_cast(std::max(1, direct_target_limit)); +} + +Status forward_runtime_filter(const PPublishFilterRequestV2& request, + const butil::IOBuf& request_attachment, + std::weak_ptr query_ctx) { + if (request.forward_targets().empty()) { + return Status::OK(); + } + + std::vector targets; + targets.reserve(request.forward_targets_size()); + for (const auto& forward_target : request.forward_targets()) { + DORIS_CHECK(forward_target.has_target_addr()); + DORIS_CHECK(forward_target.fragment_ids_size() > 0); + RuntimeFilterPublishTarget target; + target.addr.CopyFrom(forward_target.target_addr()); + target.fragment_ids.assign(forward_target.fragment_ids().begin(), + forward_target.fragment_ids().end()); + targets.emplace_back(std::move(target)); + } + + DORIS_CHECK(request.has_tree_publish_fanout()); + int fanout = request.tree_publish_fanout(); + DORIS_CHECK(fanout > 0); + DORIS_CHECK(request.has_publish_rpc_timeout_ms()); + int timeout_ms = request.publish_rpc_timeout_ms(); + auto tasks = build_runtime_filter_publish_tasks(request, targets, fanout); + VLOG_NOTICE << "Runtime filter relay publish filter_id=" << request.filter_id() + << ", forward_targets=" << targets.size() << ", child_rpc_count=" << tasks.size() + << ", fanout=" << fanout; + auto st = Status::OK(); + for (const auto& task : tasks) { + Status rpc_st = + send_runtime_filter_relay_rpc(task, request_attachment, timeout_ms, query_ctx); + if (!rpc_st.ok()) { + LOG(WARNING) << "Failed to forward runtime filter, query_id=" + << print_id(request.query_id()) << ", filter_id=" << request.filter_id() + << ", target=" << task.receiver.addr.hostname() << ":" + << task.receiver.addr.port() << ", status=" << rpc_st; + st = std::move(rpc_st); + } + } + return st; +} + RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global) : _is_global(is_global), _tracker(std::make_unique( @@ -410,16 +599,15 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q query_ctx->ignore_runtime_filter_error() ? std::weak_ptr {} : query_ctx, - merge_time, request->query_id(), query_ctx->execution_timeout()); + merge_time, request->query_id(), query_ctx->execution_timeout(), + query_ctx->query_options()); } return Status::OK(); } -Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext& cnt_val, - std::weak_ptr ctx, - int64_t merge_time, - PUniqueId query_id, - int execution_timeout) { +Status RuntimeFilterMergeControllerEntity::_send_rf_to_target( + GlobalMergeContext& cnt_val, std::weak_ptr ctx, int64_t merge_time, + PUniqueId query_id, int execution_timeout, const TQueryOptions& query_options) { if (cnt_val.targetv2_info.empty()) { return Status::InternalError( "_send_rf_to_target called with empty targetv2_info, filter: {}", @@ -454,6 +642,66 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext } std::vector& targets = cnt_val.targetv2_info; + int timeout_ms = get_execution_rpc_timeout_ms(execution_timeout); + apply_request.set_merge_time(merge_time); + *apply_request.mutable_query_id() = query_id; + const int64_t serialized_filter_size = + static_cast(apply_request.ByteSizeLong()) + std::max(0, len); + int fanout = 0; + if (query_options.__isset.runtime_filter_tree_publish_max_send_bytes) { + int64_t max_send_bytes = query_options.runtime_filter_tree_publish_max_send_bytes; + DORIS_CHECK(max_send_bytes >= 0); + fanout = calculate_tree_publish_fanout(serialized_filter_size, targets.size(), + max_send_bytes); + } + const bool use_tree_publish = fanout > 0 && targets.size() > static_cast(fanout); + + if (use_tree_publish) { + apply_request.set_tree_publish_fanout(fanout); + apply_request.set_publish_rpc_timeout_ms(timeout_ms); + auto publish_targets = build_runtime_filter_publish_targets(targets); + auto tasks = build_runtime_filter_publish_tasks(apply_request, publish_targets, fanout); + cnt_val.publish_callbacks.resize(tasks.size()); + VLOG_NOTICE << "Runtime filter tree publish filter_id=" << apply_request.filter_id() + << ", serialized_bytes=" << serialized_filter_size + << ", target_count=" << targets.size() << ", child_rpc_count=" << tasks.size() + << ", fanout=" << fanout; + auto st = Status::OK(); + for (size_t i = 0; i < tasks.size(); ++i) { + const auto& task = tasks[i]; + auto callback = HandleErrorBrpcCallback::create_shared(ctx); + cnt_val.publish_callbacks[i] = callback; + auto closure = AutoReleaseClosure>:: + create_unique(std::make_shared(task.request), + callback); + + if (has_attachment) { + closure->cntl_->request_attachment().append(request_attachment); + } + closure->cntl_->set_timeout_ms(timeout_ms); + if (config::execution_ignore_eovercrowded) { + closure->cntl_->ignore_eovercrowded(); + } + + std::shared_ptr stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + task.receiver.addr)); + if (stub == nullptr) { + LOG(WARNING) << "Failed to init rpc to " << task.receiver.addr.hostname() << ":" + << task.receiver.addr.port(); + st = Status::InternalError("Failed to init rpc to {}:{}", + task.receiver.addr.hostname(), + task.receiver.addr.port()); + continue; + } + stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + } + return st; + } + auto st = Status::OK(); cnt_val.publish_callbacks.resize(targets.size()); for (size_t i = 0; i < targets.size(); ++i) { @@ -464,30 +712,16 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext HandleErrorBrpcCallback>:: create_unique(std::make_shared(apply_request), callback); - closure->request_->set_merge_time(merge_time); - *closure->request_->mutable_query_id() = query_id; if (has_attachment) { closure->cntl_->request_attachment().append(request_attachment); } - closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(execution_timeout)); + closure->cntl_->set_timeout_ms(timeout_ms); if (config::execution_ignore_eovercrowded) { closure->cntl_->ignore_eovercrowded(); } - // set fragment-id - if (target.__isset.target_fragment_ids) { - for (auto& target_fragment_id : target.target_fragment_ids) { - closure->request_->add_fragment_ids(target_fragment_id); - } - } else { - // FE not upgraded yet. - for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) { - PUniqueId* cur_id = closure->request_->add_fragment_instance_ids(); - cur_id->set_hi(target_fragment_instance_id.hi); - cur_id->set_lo(target_fragment_instance_id.lo); - } - } + set_request_direct_publish_target(target, closure->request_.get()); std::shared_ptr stub( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index e5c6494917cea7..365e9cba1ec940 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -35,8 +36,9 @@ #include "util/uid_util.h" namespace butil { +class IOBuf; class IOBufAsZeroCopyInputStream; -} +} // namespace butil namespace doris { class PPublishFilterRequestV2; @@ -55,6 +57,30 @@ template class HandleErrorBrpcCallback; class SyncSizeCallback; +struct RuntimeFilterPublishTarget { + PNetworkAddress addr; + std::vector fragment_ids; +}; + +struct RuntimeFilterPublishTask { + RuntimeFilterPublishTarget receiver; + PPublishFilterRequestV2 request; +}; + +std::vector> split_runtime_filter_publish_targets( + const std::vector& targets, int fanout); + +std::vector build_runtime_filter_publish_tasks( + const PPublishFilterRequestV2& base_request, + const std::vector& targets, int fanout); + +int calculate_tree_publish_fanout(int64_t serialized_filter_size, size_t target_count, + int64_t max_send_bytes); + +Status forward_runtime_filter(const PPublishFilterRequestV2& request, + const butil::IOBuf& request_attachment, + std::weak_ptr query_ctx); + struct LocalMergeContext { std::shared_ptr merger; std::vector> producers; @@ -190,7 +216,8 @@ class RuntimeFilterMergeControllerEntity { const int producer_size); Status _send_rf_to_target(GlobalMergeContext& cnt_val, std::weak_ptr ctx, - int64_t merge_time, PUniqueId query_id, int execution_timeout); + int64_t merge_time, PUniqueId query_id, int execution_timeout, + const TQueryOptions& query_options); // protect _filter_map AnnotatedSharedMutex _filter_map_mutex; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 057181ec80e275..ac94271cb303ec 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -66,6 +66,7 @@ #include "exec/common/variant_util.h" #include "exec/exchange/vdata_stream_mgr.h" #include "exec/rowid_fetcher.h" +#include "exec/runtime_filter/runtime_filter_mgr.h" #include "exec/sink/writer/varrow_flight_result_writer.h" #include "exec/sink/writer/vmysql_result_writer.h" #include "exprs/function/dictionary_factory.h" @@ -97,6 +98,7 @@ #include "runtime/exec_env.h" #include "runtime/fold_constant_executor.h" #include "runtime/fragment_mgr.h" +#include "runtime/query_context.h" #include "runtime/result_block_buffer.h" #include "runtime/result_buffer_mgr.h" #include "runtime/runtime_profile.h" @@ -1570,8 +1572,11 @@ void PInternalService::apply_filterv2(::google::protobuf::RpcController* control bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { signal::SignalTaskIdKeeper keeper(request->query_id()); brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + const butil::IOBuf& request_attachment = + static_cast(controller)->request_attachment(); + butil::IOBuf apply_attachment = request_attachment; + butil::IOBuf forward_attachment = request_attachment; + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(apply_attachment); VLOG_NOTICE << "rpc apply_filterv2 recv"; Status st; try { @@ -1582,6 +1587,20 @@ void PInternalService::apply_filterv2(::google::protobuf::RpcController* control if (!st.ok()) { LOG(WARNING) << "apply filter meet error: " << st.to_string(); } + std::weak_ptr forward_ctx; + if (auto query_ctx = _exec_env->fragment_mgr()->get_query_ctx( + UniqueId(request->query_id()).to_thrift())) { + if (!query_ctx->ignore_runtime_filter_error()) { + forward_ctx = query_ctx; + } + } + Status forward_st = forward_runtime_filter(*request, forward_attachment, forward_ctx); + if (!forward_st.ok()) { + LOG(WARNING) << "forward runtime filter meet error: " << forward_st.to_string(); + if (st.ok()) { + st = std::move(forward_st); + } + } st.to_protobuf(response->mutable_status()); }); if (!ret) { diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp index 5a50e762c2527f..5f69f411cf1e31 100644 --- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp @@ -20,12 +20,37 @@ #include #include +#include + #include "exec/pipeline/thrift_builder.h" #include "exec/runtime_filter/runtime_filter_producer.h" #include "runtime/query_context.h" #include "testutil/mock/mock_runtime_state.h" namespace doris { +namespace { + +RuntimeFilterPublishTarget make_publish_target(int index) { + RuntimeFilterPublishTarget target; + target.addr.set_hostname("host" + std::to_string(index)); + target.addr.set_port(9000 + index); + target.fragment_ids.push_back(index); + return target; +} + +std::vector flatten_fragment_ids( + const std::vector>& slices) { + std::vector fragment_ids; + for (const auto& slice : slices) { + for (const auto& target : slice) { + fragment_ids.insert(fragment_ids.end(), target.fragment_ids.begin(), + target.fragment_ids.end()); + } + } + return fragment_ids; +} + +} // namespace class RuntimeFilterMgrTest : public testing::Test { public: @@ -186,4 +211,89 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMergeControllerEntity) { } } +TEST_F(RuntimeFilterMgrTest, SplitRuntimeFilterPublishTargets) { + std::vector targets; + for (int i = 0; i < 10; ++i) { + targets.push_back(make_publish_target(i)); + } + + auto slices = split_runtime_filter_publish_targets(targets, 3); + ASSERT_EQ(slices.size(), 3); + EXPECT_EQ(slices[0].size(), 4); + EXPECT_EQ(slices[1].size(), 3); + EXPECT_EQ(slices[2].size(), 3); + EXPECT_EQ(flatten_fragment_ids(slices), (std::vector {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); + + slices = split_runtime_filter_publish_targets(targets, 20); + ASSERT_EQ(slices.size(), targets.size()); + for (const auto& slice : slices) { + EXPECT_EQ(slice.size(), 1); + } + + slices = split_runtime_filter_publish_targets(targets, 1); + ASSERT_EQ(slices.size(), 1); + EXPECT_EQ(slices[0].size(), targets.size()); +} + +TEST_F(RuntimeFilterMgrTest, CalculateTreePublishFanout) { + constexpr int64_t MB = 1024L * 1024L; + constexpr int64_t max_send_bytes = 256L * MB; + constexpr size_t target_count = 48; + + EXPECT_EQ(calculate_tree_publish_fanout(1L * MB, target_count, max_send_bytes), 0); + EXPECT_EQ(calculate_tree_publish_fanout(4L * MB, target_count, max_send_bytes), 0); + EXPECT_EQ(calculate_tree_publish_fanout(8L * MB, target_count, max_send_bytes), 32); + EXPECT_EQ(calculate_tree_publish_fanout(16L * MB, target_count, max_send_bytes), 16); + EXPECT_EQ(calculate_tree_publish_fanout(32L * MB, target_count, max_send_bytes), 8); + EXPECT_EQ(calculate_tree_publish_fanout(64L * MB, target_count, max_send_bytes), 4); + EXPECT_EQ(calculate_tree_publish_fanout(128L * MB, target_count, max_send_bytes), 2); + + EXPECT_EQ(calculate_tree_publish_fanout(4L * MB, target_count, 0), 0); + EXPECT_EQ(calculate_tree_publish_fanout(256L * MB, target_count, max_send_bytes), 1); + EXPECT_EQ(calculate_tree_publish_fanout(1L * MB, 1024, max_send_bytes), 256); +} + +TEST_F(RuntimeFilterMgrTest, BuildRuntimeFilterPublishTasks) { + PPublishFilterRequestV2 base_request; + base_request.set_filter_id(10); + base_request.mutable_query_id()->set_hi(1); + base_request.mutable_query_id()->set_lo(2); + base_request.set_filter_type(PFilterType::BLOOM_FILTER); + base_request.set_tree_publish_fanout(2); + base_request.set_publish_rpc_timeout_ms(3000); + base_request.add_fragment_ids(999); + auto* stale_forward_target = base_request.add_forward_targets(); + stale_forward_target->mutable_target_addr()->set_hostname("stale"); + stale_forward_target->mutable_target_addr()->set_port(1); + stale_forward_target->add_fragment_ids(999); + + std::vector targets; + for (int i = 0; i < 5; ++i) { + targets.push_back(make_publish_target(i)); + } + + auto tasks = build_runtime_filter_publish_tasks(base_request, targets, 2); + ASSERT_EQ(tasks.size(), 2); + + EXPECT_EQ(tasks[0].receiver.addr.hostname(), "host0"); + EXPECT_EQ(tasks[0].request.fragment_ids_size(), 1); + EXPECT_EQ(tasks[0].request.fragment_ids(0), 0); + ASSERT_EQ(tasks[0].request.forward_targets_size(), 2); + EXPECT_EQ(tasks[0].request.forward_targets(0).target_addr().hostname(), "host1"); + EXPECT_EQ(tasks[0].request.forward_targets(0).fragment_ids(0), 1); + EXPECT_EQ(tasks[0].request.forward_targets(1).target_addr().hostname(), "host2"); + EXPECT_EQ(tasks[0].request.forward_targets(1).fragment_ids(0), 2); + + EXPECT_EQ(tasks[1].receiver.addr.hostname(), "host3"); + EXPECT_EQ(tasks[1].request.fragment_ids_size(), 1); + EXPECT_EQ(tasks[1].request.fragment_ids(0), 3); + ASSERT_EQ(tasks[1].request.forward_targets_size(), 1); + EXPECT_EQ(tasks[1].request.forward_targets(0).target_addr().hostname(), "host4"); + EXPECT_EQ(tasks[1].request.forward_targets(0).fragment_ids(0), 4); + + EXPECT_EQ(tasks[0].request.filter_id(), base_request.filter_id()); + EXPECT_EQ(tasks[0].request.tree_publish_fanout(), 2); + EXPECT_EQ(tasks[0].request.publish_rpc_timeout_ms(), 3000); +} + } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 845030a37c163a..faa24331feedd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -258,6 +258,8 @@ public class SessionVariable implements Serializable, Writable { "runtime_filter_broadcast_join_producer_num"; public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = "enable_sync_runtime_filter_size"; + public static final String RUNTIME_FILTER_TREE_PUBLISH_MAX_SEND_BYTES = + "runtime_filter_tree_publish_max_send_bytes"; public static final String ENABLE_PARALLEL_RESULT_SINK = "enable_parallel_result_sink"; @@ -1761,6 +1763,10 @@ public enum IgnoreSplitType { + "The legacy Coordinator path keeps the existing behavior."}) private int runtimeFilterBroadcastJoinProducerNum = 3; + @VarAttrDef.VarAttr(name = RUNTIME_FILTER_TREE_PUBLISH_MAX_SEND_BYTES, needForward = true, fuzzy = true, + checker = "checkRuntimeFilterTreePublishMaxSendBytes") + private long runtimeFilterTreePublishMaxSendBytes = 256L * 1024L * 1024L; + @VarAttrDef.VarAttr(name = "runtime_filter_max_build_row_count", needForward = true, fuzzy = false) public long runtimeFilterMaxBuildRowCount = 64L * 1024L * 1024L; @@ -3859,6 +3865,9 @@ public void initFuzzyModeVariables() { this.enableParallelScan = random.nextInt(2) == 0; this.enableRuntimeFilterPrune = (randomInt % 10) == 0; this.enableRuntimeFilterPartitionPrune = (randomInt % 2) == 0; + this.runtimeFilterTreePublishMaxSendBytes = + Util.getRandomLong(0, 64L * 1024L * 1024L, 128L * 1024L * 1024L, + 256L * 1024L * 1024L); switch (randomInt) { case 0: @@ -4788,6 +4797,10 @@ public void setRuntimeFilterBroadcastJoinProducerNum(int runtimeFilterBroadcastJ this.runtimeFilterBroadcastJoinProducerNum = runtimeFilterBroadcastJoinProducerNum; } + public long getRuntimeFilterTreePublishMaxSendBytes() { + return runtimeFilterTreePublishMaxSendBytes; + } + public void setEnableLocalShuffle(boolean enableLocalShuffle) { this.enableLocalShuffle = enableLocalShuffle; } @@ -5606,6 +5619,7 @@ public TQueryOptions toThrift() { tResult.setRuntimeBloomFilterMinSize(runtimeBloomFilterMinSize); tResult.setRuntimeBloomFilterMaxSize(runtimeBloomFilterMaxSize); tResult.setRuntimeFilterWaitInfinitely(runtimeFilterWaitInfinitely); + tResult.setRuntimeFilterTreePublishMaxSendBytes(runtimeFilterTreePublishMaxSendBytes); tResult.setEnableFuzzyBlockableTask(enableFuzzyBlockableTask); tResult.setEnableFunctionPushdown(enableFunctionPushdown); @@ -6543,6 +6557,18 @@ public void checkProfileLevel(String profileLevel) { } } + public void checkRuntimeFilterTreePublishMaxSendBytes(String maxSendBytes) { + long value = Long.valueOf(maxSendBytes); + if (value < 0) { + UnsupportedOperationException exception = + new UnsupportedOperationException( + "runtime_filter_tree_publish_max_send_bytes can not be set to " + + maxSendBytes + ", it must be greater or equal to 0"); + LOG.warn("Check runtime_filter_tree_publish_max_send_bytes failed", exception); + throw exception; + } + } + public void checkSqlConvertorFeatures(String features) { if (Strings.isNullOrEmpty(features)) { return; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index c5818339f3a66e..cd5fe59532c58d 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -696,8 +696,16 @@ message PPublishFilterRequestV2 { optional uint64 local_merge_time = 13; optional bool disabled = 14; optional uint32 stage = 15; + repeated PPublishFilterForwardTarget forward_targets = 16; + optional int32 tree_publish_fanout = 17; + optional int32 publish_rpc_timeout_ms = 18; }; +message PPublishFilterForwardTarget { + optional PNetworkAddress target_addr = 1; + repeated int32 fragment_ids = 2; +} + message PPublishFilterResponse { required PStatus status = 1; }; @@ -1267,4 +1275,3 @@ service PBackendService { rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); rpc request_cdc_client(PRequestCdcClientRequest) returns (PRequestCdcClientResult); }; - diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index cd2292ca6b63c3..f56096f711c9aa 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -504,6 +504,7 @@ struct TQueryOptions { // To control whether BE scan readers may apply expression-based ZoneMap pruning. 224: optional bool enable_expr_zonemap_filter = true + 225: optional i64 runtime_filter_tree_publish_max_send_bytes = 268435456 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.