Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 256 additions & 22 deletions be/src/exec/runtime_filter/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>

#include <algorithm>
#include <cstdlib>
#include <cstring>
#include <mutex>
#include <ostream>
#include <string>
Expand All @@ -45,9 +48,195 @@
#include "runtime/thread_context.h"
#include "util/brpc_client_cache.h"
#include "util/brpc_closure.h"
#include "util/uid_util.h"

namespace doris {

namespace {

std::vector<RuntimeFilterPublishTarget> build_runtime_filter_publish_targets(
const std::vector<TRuntimeFilterTargetParamsV2>& targets) {
std::vector<RuntimeFilterPublishTarget> publish_targets;
publish_targets.reserve(targets.size());
for (const auto& target : targets) {
DORIS_CHECK(target.__isset.target_fragment_ids);
DORIS_CHECK(!target.target_fragment_ids.empty());
RuntimeFilterPublishTarget publish_target;
publish_target.addr.set_hostname(target.target_fragment_instance_addr.hostname);
publish_target.addr.set_port(target.target_fragment_instance_addr.port);
publish_target.fragment_ids = target.target_fragment_ids;
publish_targets.emplace_back(std::move(publish_target));
}
return publish_targets;
}

class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure {
public:
RuntimeFilterRelayRpcClosure(std::shared_ptr<PPublishFilterRequestV2> request,
std::weak_ptr<QueryContext> query_ctx)
: _request(std::move(request)),
_callback(HandleErrorBrpcCallback<PPublishFilterResponse>::create_shared(
std::move(query_ctx))) {}

void Run() override {
Comment thread
BiteTheDDDDt marked this conversation as resolved.
std::unique_ptr<RuntimeFilterRelayRpcClosure> self(this);
_callback->call();
}

brpc::Controller* cntl() { return _callback->cntl_.get(); }
PPublishFilterRequestV2* request() { return _request.get(); }
PPublishFilterResponse* response() { return _callback->response_.get(); }

private:
std::shared_ptr<PPublishFilterRequestV2> _request;
std::shared_ptr<HandleErrorBrpcCallback<PPublishFilterResponse>> _callback;
};

Status send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task,
const butil::IOBuf& request_attachment, int timeout_ms,
std::weak_ptr<QueryContext> query_ctx) {
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(task.receiver.addr));
if (stub == nullptr) {
Comment thread
BiteTheDDDDt marked this conversation as resolved.
LOG(WARNING) << "Failed to init runtime filter relay rpc to "
<< task.receiver.addr.hostname() << ":" << task.receiver.addr.port();
return Status::InternalError("Failed to init runtime filter relay rpc to {}:{}",
task.receiver.addr.hostname(), task.receiver.addr.port());
}

// brpc calls Run() exactly once; RuntimeFilterRelayRpcClosure deletes itself there.
auto* closure = new RuntimeFilterRelayRpcClosure(
Comment thread
BiteTheDDDDt marked this conversation as resolved.
std::make_shared<PPublishFilterRequestV2>(task.request), std::move(query_ctx));
if (!request_attachment.empty()) {
closure->cntl()->request_attachment().append(request_attachment);
}
closure->cntl()->set_timeout_ms(timeout_ms);
if (config::execution_ignore_eovercrowded) {
closure->cntl()->ignore_eovercrowded();
}
stub->apply_filterv2(closure->cntl(), closure->request(), closure->response(), closure);
return Status::OK();
}

void set_request_direct_publish_target(const TRuntimeFilterTargetParamsV2& target,
PPublishFilterRequestV2* request) {
Comment thread
BiteTheDDDDt marked this conversation as resolved.
DORIS_CHECK(target.__isset.target_fragment_ids);
DORIS_CHECK(!target.target_fragment_ids.empty());
for (const auto& target_fragment_id : target.target_fragment_ids) {
request->add_fragment_ids(target_fragment_id);
}
}

} // namespace

std::vector<std::vector<RuntimeFilterPublishTarget>> split_runtime_filter_publish_targets(
const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
DORIS_CHECK(!targets.empty());
DORIS_CHECK(fanout > 0);
size_t slice_count = std::min(targets.size(), static_cast<size_t>(fanout));
std::vector<std::vector<RuntimeFilterPublishTarget>> slices;
slices.reserve(slice_count);
for (size_t offset = 0; offset < targets.size();) {
size_t remaining_targets = targets.size() - offset;
size_t remaining_slices = slice_count - slices.size();
size_t slice_size = (remaining_targets + remaining_slices - 1) / remaining_slices;
slices.emplace_back(targets.begin() + offset, targets.begin() + offset + slice_size);
offset += slice_size;
}
return slices;
}

std::vector<RuntimeFilterPublishTask> build_runtime_filter_publish_tasks(
const PPublishFilterRequestV2& base_request,
const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
std::vector<RuntimeFilterPublishTask> tasks;
auto slices = split_runtime_filter_publish_targets(targets, fanout);
PPublishFilterRequestV2 request_template = base_request;
request_template.clear_fragment_ids();
request_template.clear_fragment_instance_ids();
request_template.clear_forward_targets();
tasks.reserve(slices.size());
for (const auto& slice : slices) {
DORIS_CHECK(!slice.empty());
RuntimeFilterPublishTask task;
task.receiver = slice.front();
task.request = request_template;

for (int32_t fragment_id : task.receiver.fragment_ids) {
task.request.add_fragment_ids(fragment_id);
}
for (size_t i = 1; i < slice.size(); ++i) {
DORIS_CHECK(!slice[i].fragment_ids.empty());
PPublishFilterForwardTarget* forward_target = task.request.add_forward_targets();
forward_target->mutable_target_addr()->CopyFrom(slice[i].addr);
for (int32_t fragment_id : slice[i].fragment_ids) {
forward_target->add_fragment_ids(fragment_id);
Comment thread
BiteTheDDDDt marked this conversation as resolved.
}
}
tasks.emplace_back(std::move(task));
}
return tasks;
}

int calculate_tree_publish_fanout(int64_t serialized_filter_size, size_t target_count,
int64_t max_send_bytes) {
DORIS_CHECK(serialized_filter_size > 0);
DORIS_CHECK(max_send_bytes >= 0);
if (max_send_bytes == 0 || target_count <= 1) {
return 0;
}

const int64_t direct_target_limit = max_send_bytes / serialized_filter_size;
if (target_count <= static_cast<size_t>(direct_target_limit)) {
return 0;
}

return static_cast<int>(std::max<int64_t>(1, direct_target_limit));
}

Status forward_runtime_filter(const PPublishFilterRequestV2& request,
const butil::IOBuf& request_attachment,
std::weak_ptr<QueryContext> query_ctx) {
if (request.forward_targets().empty()) {
return Status::OK();
}

std::vector<RuntimeFilterPublishTarget> targets;
targets.reserve(request.forward_targets_size());
for (const auto& forward_target : request.forward_targets()) {
DORIS_CHECK(forward_target.has_target_addr());
DORIS_CHECK(forward_target.fragment_ids_size() > 0);
RuntimeFilterPublishTarget target;
target.addr.CopyFrom(forward_target.target_addr());
target.fragment_ids.assign(forward_target.fragment_ids().begin(),
forward_target.fragment_ids().end());
targets.emplace_back(std::move(target));
}

DORIS_CHECK(request.has_tree_publish_fanout());
int fanout = request.tree_publish_fanout();
DORIS_CHECK(fanout > 0);
DORIS_CHECK(request.has_publish_rpc_timeout_ms());
int timeout_ms = request.publish_rpc_timeout_ms();
auto tasks = build_runtime_filter_publish_tasks(request, targets, fanout);
VLOG_NOTICE << "Runtime filter relay publish filter_id=" << request.filter_id()
<< ", forward_targets=" << targets.size() << ", child_rpc_count=" << tasks.size()
<< ", fanout=" << fanout;
auto st = Status::OK();
for (const auto& task : tasks) {
Status rpc_st =
send_runtime_filter_relay_rpc(task, request_attachment, timeout_ms, query_ctx);
if (!rpc_st.ok()) {
Comment thread
BiteTheDDDDt marked this conversation as resolved.
LOG(WARNING) << "Failed to forward runtime filter, query_id="
<< print_id(request.query_id()) << ", filter_id=" << request.filter_id()
<< ", target=" << task.receiver.addr.hostname() << ":"
<< task.receiver.addr.port() << ", status=" << rpc_st;
st = std::move(rpc_st);
}
}
return st;
}

RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global)
: _is_global(is_global),
_tracker(std::make_unique<MemTracker>(
Expand Down Expand Up @@ -410,16 +599,15 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
query_ctx->ignore_runtime_filter_error()
? std::weak_ptr<QueryContext> {}
: query_ctx,
merge_time, request->query_id(), query_ctx->execution_timeout());
merge_time, request->query_id(), query_ctx->execution_timeout(),
query_ctx->query_options());
}
return Status::OK();
}

Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext& cnt_val,
std::weak_ptr<QueryContext> ctx,
int64_t merge_time,
PUniqueId query_id,
int execution_timeout) {
Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(
GlobalMergeContext& cnt_val, std::weak_ptr<QueryContext> ctx, int64_t merge_time,
PUniqueId query_id, int execution_timeout, const TQueryOptions& query_options) {
if (cnt_val.targetv2_info.empty()) {
return Status::InternalError(
"_send_rf_to_target called with empty targetv2_info, filter: {}",
Expand Down Expand Up @@ -454,6 +642,66 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
}

std::vector<TRuntimeFilterTargetParamsV2>& targets = cnt_val.targetv2_info;
int timeout_ms = get_execution_rpc_timeout_ms(execution_timeout);
apply_request.set_merge_time(merge_time);
*apply_request.mutable_query_id() = query_id;
const int64_t serialized_filter_size =
static_cast<int64_t>(apply_request.ByteSizeLong()) + std::max(0, len);
int fanout = 0;
if (query_options.__isset.runtime_filter_tree_publish_max_send_bytes) {
int64_t max_send_bytes = query_options.runtime_filter_tree_publish_max_send_bytes;
DORIS_CHECK(max_send_bytes >= 0);
fanout = calculate_tree_publish_fanout(serialized_filter_size, targets.size(),
max_send_bytes);
}
const bool use_tree_publish = fanout > 0 && targets.size() > static_cast<size_t>(fanout);

if (use_tree_publish) {
apply_request.set_tree_publish_fanout(fanout);
apply_request.set_publish_rpc_timeout_ms(timeout_ms);
auto publish_targets = build_runtime_filter_publish_targets(targets);
auto tasks = build_runtime_filter_publish_tasks(apply_request, publish_targets, fanout);
cnt_val.publish_callbacks.resize(tasks.size());
VLOG_NOTICE << "Runtime filter tree publish filter_id=" << apply_request.filter_id()
<< ", serialized_bytes=" << serialized_filter_size
<< ", target_count=" << targets.size() << ", child_rpc_count=" << tasks.size()
<< ", fanout=" << fanout;
auto st = Status::OK();
for (size_t i = 0; i < tasks.size(); ++i) {
const auto& task = tasks[i];
auto callback = HandleErrorBrpcCallback<PPublishFilterResponse>::create_shared(ctx);
cnt_val.publish_callbacks[i] = callback;
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
HandleErrorBrpcCallback<PPublishFilterResponse>>::
create_unique(std::make_shared<PPublishFilterRequestV2>(task.request),
callback);

if (has_attachment) {
closure->cntl_->request_attachment().append(request_attachment);
}
closure->cntl_->set_timeout_ms(timeout_ms);
if (config::execution_ignore_eovercrowded) {
closure->cntl_->ignore_eovercrowded();
}

std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
task.receiver.addr));
if (stub == nullptr) {
LOG(WARNING) << "Failed to init rpc to " << task.receiver.addr.hostname() << ":"
<< task.receiver.addr.port();
st = Status::InternalError("Failed to init rpc to {}:{}",
task.receiver.addr.hostname(),
task.receiver.addr.port());
continue;
}
stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(), closure.get());
closure.release();
}
return st;
}

auto st = Status::OK();
cnt_val.publish_callbacks.resize(targets.size());
for (size_t i = 0; i < targets.size(); ++i) {
Expand All @@ -464,30 +712,16 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
HandleErrorBrpcCallback<PPublishFilterResponse>>::
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request), callback);

closure->request_->set_merge_time(merge_time);
*closure->request_->mutable_query_id() = query_id;
if (has_attachment) {
closure->cntl_->request_attachment().append(request_attachment);
}

closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(execution_timeout));
closure->cntl_->set_timeout_ms(timeout_ms);
if (config::execution_ignore_eovercrowded) {
closure->cntl_->ignore_eovercrowded();
}

// set fragment-id
if (target.__isset.target_fragment_ids) {
for (auto& target_fragment_id : target.target_fragment_ids) {
closure->request_->add_fragment_ids(target_fragment_id);
}
} else {
// FE not upgraded yet.
for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) {
PUniqueId* cur_id = closure->request_->add_fragment_instance_ids();
cur_id->set_hi(target_fragment_instance_id.hi);
cur_id->set_lo(target_fragment_instance_id.lo);
}
}
set_request_direct_publish_target(target, closure->request_.get());

std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
Expand Down
31 changes: 29 additions & 2 deletions be/src/exec/runtime_filter/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>

#include <cstddef>
#include <cstdint>
#include <map>
#include <memory>
Expand All @@ -35,8 +36,9 @@
#include "util/uid_util.h"

namespace butil {
class IOBuf;
class IOBufAsZeroCopyInputStream;
}
} // namespace butil

namespace doris {
class PPublishFilterRequestV2;
Expand All @@ -55,6 +57,30 @@ template <typename Response>
class HandleErrorBrpcCallback;
class SyncSizeCallback;

struct RuntimeFilterPublishTarget {
PNetworkAddress addr;
std::vector<int32_t> fragment_ids;
};

struct RuntimeFilterPublishTask {
RuntimeFilterPublishTarget receiver;
PPublishFilterRequestV2 request;
};

std::vector<std::vector<RuntimeFilterPublishTarget>> split_runtime_filter_publish_targets(
const std::vector<RuntimeFilterPublishTarget>& targets, int fanout);

std::vector<RuntimeFilterPublishTask> build_runtime_filter_publish_tasks(
const PPublishFilterRequestV2& base_request,
const std::vector<RuntimeFilterPublishTarget>& targets, int fanout);

int calculate_tree_publish_fanout(int64_t serialized_filter_size, size_t target_count,
int64_t max_send_bytes);

Status forward_runtime_filter(const PPublishFilterRequestV2& request,
const butil::IOBuf& request_attachment,
std::weak_ptr<QueryContext> query_ctx);

struct LocalMergeContext {
std::shared_ptr<RuntimeFilterMerger> merger;
std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
Expand Down Expand Up @@ -190,7 +216,8 @@ class RuntimeFilterMergeControllerEntity {
const int producer_size);

Status _send_rf_to_target(GlobalMergeContext& cnt_val, std::weak_ptr<QueryContext> ctx,
int64_t merge_time, PUniqueId query_id, int execution_timeout);
int64_t merge_time, PUniqueId query_id, int execution_timeout,
const TQueryOptions& query_options);

// protect _filter_map
AnnotatedSharedMutex _filter_map_mutex;
Expand Down
Loading
Loading