Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ DEFINE_mBool(enable_warmup_immediately_on_new_rowset, "false");

// Packed file manager config
DEFINE_mBool(enable_packed_file, "true");
DEFINE_mBool(enable_file_cache_write_index_file_only, "false");
DEFINE_mInt64(packed_file_size_threshold_bytes, "5242880"); // 5MB
DEFINE_mInt64(packed_file_time_threshold_ms, "100"); // 100ms
DEFINE_mInt64(packed_file_try_lock_timeout_ms, "5"); // 5ms
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ DECLARE_mBool(enable_warmup_immediately_on_new_rowset);

// Packed file manager config
DECLARE_mBool(enable_packed_file);
DECLARE_mBool(enable_file_cache_write_index_file_only);
DECLARE_mInt64(packed_file_size_threshold_bytes);
DECLARE_mInt64(packed_file_time_threshold_ms);
DECLARE_mInt64(packed_file_try_lock_timeout_ms);
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,8 @@ DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB

DEFINE_Bool(clear_file_cache, "false");
DEFINE_mBool(enable_file_cache_query_limit, "false");
// Whether segment footer and segment metadata count toward file cache query limit.
DEFINE_mBool(file_cache_query_limit_segment_meta, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "88");
DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,7 @@ DECLARE_String(file_cache_path);
DECLARE_Int64(file_cache_each_block_size);
DECLARE_Bool(clear_file_cache);
DECLARE_mBool(enable_file_cache_query_limit);
DECLARE_mBool(file_cache_query_limit_segment_meta);
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
DECLARE_mBool(enable_evict_file_cache_in_advance);
Expand Down
209 changes: 206 additions & 3 deletions be/src/exec/operator/materialization_opertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>

#include <set>
#include <sstream>
#include <utility>

#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "core/block/block.h"
#include "core/column/column.h"
Expand All @@ -31,9 +35,107 @@
#include "exec/scan/file_scanner.h"
#include "util/brpc_client_cache.h"
#include "util/brpc_closure.h"
#include "util/pretty_printer.h"

namespace doris {

namespace {

constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND =
"TopNLazyMaterializationSecondPhasePerBackend";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_ROWS_READ =
"TopNLazyMaterializationSecondPhasePerBackendRowsRead";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_SEGMENTS_READ =
"TopNLazyMaterializationSecondPhasePerBackendSegmentsRead";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_LOCAL_IO_COUNT =
"TopNLazyMaterializationSecondPhasePerBackendLocalIOCount";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_LOCAL_IO_BYTES =
"TopNLazyMaterializationSecondPhasePerBackendLocalIOBytes";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_REMOTE_IO_COUNT =
"TopNLazyMaterializationSecondPhasePerBackendRemoteIOCount";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_REMOTE_IO_BYTES =
"TopNLazyMaterializationSecondPhasePerBackendRemoteIOBytes";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_SKIP_CACHE_IO_COUNT =
"TopNLazyMaterializationSecondPhasePerBackendSkipCacheIOCount";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_WRITE_CACHE_BYTES =
"TopNLazyMaterializationSecondPhasePerBackendWriteCacheBytes";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_LOCAL_IO_TIME =
"TopNLazyMaterializationSecondPhasePerBackendLocalIOTime";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_REMOTE_IO_TIME =
"TopNLazyMaterializationSecondPhasePerBackendRemoteIOTime";
constexpr const char* TOPN_LAZY_MAT_PHASE2_PER_BACKEND_WRITE_CACHE_IO_TIME =
"TopNLazyMaterializationSecondPhasePerBackendWriteCacheIOTime";

void update_counter(RuntimeProfile* profile, const std::string& name, TUnit::type unit,
int64_t value) {
COUNTER_UPDATE(ADD_COUNTER_WITH_LEVEL(profile, name, unit, 2), value);
}

void update_topn_lazy_materialization_profile(RuntimeProfile* profile,
const PTopNLazyMaterializationFileCacheStats& stats) {
if (profile == nullptr) {
return;
}
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseLocalIOCount,
TUnit::UNIT, stats.local_io_count());
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseLocalIOBytes,
TUnit::BYTES, stats.local_io_bytes());
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseRemoteIOCount,
TUnit::UNIT, stats.remote_io_count());
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseRemoteIOBytes,
TUnit::BYTES, stats.remote_io_bytes());
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseSkipCacheIOCount,
TUnit::UNIT, stats.skip_cache_io_count());
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseWriteCacheBytes,
TUnit::BYTES, stats.write_cache_bytes());
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseLocalIOTime,
TUnit::TIME_NS, stats.local_io_time());
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseRemoteIOTime,
TUnit::TIME_NS, stats.remote_io_time());
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseWriteCacheIOTime,
TUnit::TIME_NS, stats.write_cache_io_time());
}

int64_t count_request_rows(const PMultiGetRequestV2& request) {
int64_t rows = 0;
for (const auto& request_block_desc : request.request_block_descs()) {
rows += request_block_desc.row_id_size();
}
return rows;
}

int64_t count_request_segments(const PMultiGetRequestV2& request) {
std::set<uint32_t> file_ids;
for (const auto& request_block_desc : request.request_block_descs()) {
DCHECK_EQ(request_block_desc.file_id_size(), request_block_desc.row_id_size());
for (const auto file_id : request_block_desc.file_id()) {
file_ids.insert(file_id);
}
}
return file_ids.size();
}

template <typename AppendValue>
std::string format_array(size_t size, AppendValue append_value) {
std::stringstream values;
values << "[";
for (size_t i = 0; i < size; ++i) {
append_value(values, i);
values << ", ";
}
values << "]";
return values.str();
}

template <typename GetValue>
std::string format_counter_array(size_t size, TUnit::type unit, GetValue get_value) {
return format_array(size, [&](std::stringstream& values, size_t i) {
values << PrettyPrinter::print(static_cast<int64_t>(get_value(i)), unit);
});
}

} // namespace

void MaterializationSharedState::get_block(Block* block) {
for (int i = 0, j = 0, rowid_to_block_loc = rowid_locs[j]; i < origin_block.columns(); i++) {
if (i != rowid_to_block_loc) {
Expand All @@ -53,6 +155,98 @@ void MaterializationSharedState::get_block(Block* block) {
origin_block.clear();
}

void MaterializationSharedState::_update_topn_lazy_materialization_profile(
RuntimeProfile* profile) {
DORIS_CHECK(profile != nullptr);
for (const auto& [backend_id, rpc_struct] : rpc_struct_map) {
const int64_t rows_read = count_request_rows(rpc_struct.request);
const int64_t segments_read = count_request_segments(rpc_struct.request);
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseRowsRead,
TUnit::UNIT, rows_read);
update_counter(profile, RowIdStorageReader::TopNLazyMaterializationSecondPhaseSegmentsRead,
TUnit::UNIT, segments_read);

auto& stats = _topn_lazy_materialization_backend_stats[backend_id];
if (stats.backend.empty()) {
stats.backend = rpc_struct.backend_address.empty() ? fmt::format("id={}", backend_id)
: rpc_struct.backend_address;
}
stats.rows_read += rows_read;
stats.segments_read += segments_read;
if (!rpc_struct.response.has_topn_lazy_materialization_file_cache_stats()) {
continue;
}

const auto& file_cache_stats =
rpc_struct.response.topn_lazy_materialization_file_cache_stats();
update_topn_lazy_materialization_profile(profile, file_cache_stats);
stats.local_io_count += file_cache_stats.local_io_count();
stats.local_io_bytes += file_cache_stats.local_io_bytes();
stats.remote_io_count += file_cache_stats.remote_io_count();
stats.remote_io_bytes += file_cache_stats.remote_io_bytes();
stats.skip_cache_io_count += file_cache_stats.skip_cache_io_count();
stats.write_cache_bytes += file_cache_stats.write_cache_bytes();
stats.local_io_time += file_cache_stats.local_io_time();
stats.remote_io_time += file_cache_stats.remote_io_time();
stats.write_cache_io_time += file_cache_stats.write_cache_io_time();
}

std::vector<const TopNLazyMaterializationBackendStats*> stats;
stats.reserve(_topn_lazy_materialization_backend_stats.size());
for (const auto& [_, backend_stats] : _topn_lazy_materialization_backend_stats) {
stats.push_back(&backend_stats);
}

const size_t size = stats.size();
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND,
format_array(size, [&](std::stringstream& values, size_t i) {
values << stats[i]->backend;
}));
profile->add_info_string(
TOPN_LAZY_MAT_PHASE2_PER_BACKEND_ROWS_READ,
format_counter_array(size, TUnit::UNIT, [&](size_t i) { return stats[i]->rows_read; }));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_SEGMENTS_READ,
format_counter_array(size, TUnit::UNIT, [&](size_t i) {
return stats[i]->segments_read;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_LOCAL_IO_COUNT,
format_counter_array(size, TUnit::UNIT, [&](size_t i) {
return stats[i]->local_io_count;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_LOCAL_IO_BYTES,
format_counter_array(size, TUnit::BYTES, [&](size_t i) {
return stats[i]->local_io_bytes;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_REMOTE_IO_COUNT,
format_counter_array(size, TUnit::UNIT, [&](size_t i) {
return stats[i]->remote_io_count;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_REMOTE_IO_BYTES,
format_counter_array(size, TUnit::BYTES, [&](size_t i) {
return stats[i]->remote_io_bytes;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_SKIP_CACHE_IO_COUNT,
format_counter_array(size, TUnit::UNIT, [&](size_t i) {
return stats[i]->skip_cache_io_count;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_WRITE_CACHE_BYTES,
format_counter_array(size, TUnit::BYTES, [&](size_t i) {
return stats[i]->write_cache_bytes;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_LOCAL_IO_TIME,
format_counter_array(size, TUnit::TIME_NS, [&](size_t i) {
return stats[i]->local_io_time;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_REMOTE_IO_TIME,
format_counter_array(size, TUnit::TIME_NS, [&](size_t i) {
return stats[i]->remote_io_time;
}));
profile->add_info_string(TOPN_LAZY_MAT_PHASE2_PER_BACKEND_WRITE_CACHE_IO_TIME,
format_counter_array(size, TUnit::TIME_NS, [&](size_t i) {
return stats[i]->write_cache_io_time;
}));
}

// Merges RPC responses from multiple BEs into `response_blocks` in the original row order.
//
// After parallel multiget_data_v2 RPCs complete, each BE's response contains a partial block
Expand All @@ -63,7 +257,9 @@ void MaterializationSharedState::get_block(Block* block) {
// rpc_struct_map[backend_id].response (per-BE partial blocks, unordered across BEs)
// + block_order_results[i][j] (maps each output row → its source backend_id)
// → response_blocks[i] (final merged result in original TopN row order)
Status MaterializationSharedState::merge_multi_response() {
Status MaterializationSharedState::merge_multi_response(RuntimeProfile* profile) {
_update_topn_lazy_materialization_profile(profile);

// Outer loop: iterate over each relation (i.e., each rowid column / table).
// A query with lazy materialization on 2 tables would have block_order_results.size() == 2,
// each with its own set of response_blocks and RPC request_block_descs.
Expand Down Expand Up @@ -265,6 +461,9 @@ Status MaterializationSharedState::init_multi_requests(
// Initialize the base struct of PMultiGetRequestV2
multi_get_request.set_be_exec_version(state->be_exec_version());
multi_get_request.set_wg_id(state->get_query_ctx()->workload_group()->id());
multi_get_request.set_file_cache_remote_only_on_miss(
config::is_cloud_mode() &&
state->query_options().enable_topn_lazy_mat_phase2_no_write_file_cache);
auto* query_id = multi_get_request.mutable_query_id();
query_id->set_hi(state->query_id().hi);
query_id->set_lo(state->query_id().lo);
Expand Down Expand Up @@ -315,7 +514,10 @@ Status MaterializationSharedState::init_multi_requests(
FetchRpcStruct {.stub = std::move(client),
.cntl = std::make_unique<brpc::Controller>(),
.request = multi_get_request,
.response = PMultiGetResponseV2()});
.response = PMultiGetResponseV2(),
.backend_address = fmt::format(
"id={} {}:{}", node_info.id, node_info.host,
node_info.async_internal_port)});
}

return Status::OK();
Expand Down Expand Up @@ -427,7 +629,8 @@ Status MaterializationOperator::push(RuntimeState* state, Block* in_block, bool

if (local_state._materialization_state.need_merge_block) {
SCOPED_TIMER(local_state._merge_response_timer);
RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response());
RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response(
local_state.operator_profile()));
local_state._max_rows_per_backend_counter->set(
(int64_t)local_state._materialization_state._max_rows_per_backend);
}
Expand Down
27 changes: 26 additions & 1 deletion be/src/exec/operator/materialization_opertor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

#include <stdint.h>

#include <map>
#include <string>
#include <unordered_map>

#include "common/status.h"
#include "exec/operator/operator.h"

Expand All @@ -32,6 +36,7 @@ struct FetchRpcStruct {
std::unique_ptr<brpc::Controller> cntl;
PMultiGetRequestV2 request;
PMultiGetResponseV2 response;
std::string backend_address;
};

struct MaterializationSharedState {
Expand All @@ -41,11 +46,27 @@ struct MaterializationSharedState {
Status init_multi_requests(const TMaterializationNode& tnode, RuntimeState* state);
Status create_muiltget_result(const Columns& columns, bool eos);

Status merge_multi_response();
Status merge_multi_response(RuntimeProfile* profile);
void get_block(Block* block);

private:
void _update_profile_info(int64_t backend_id, RuntimeProfile* response_profile);
void _update_topn_lazy_materialization_profile(RuntimeProfile* profile);

struct TopNLazyMaterializationBackendStats {
std::string backend;
int64_t rows_read = 0;
int64_t segments_read = 0;
int64_t local_io_count = 0;
int64_t local_io_bytes = 0;
int64_t remote_io_count = 0;
int64_t remote_io_bytes = 0;
int64_t skip_cache_io_count = 0;
int64_t write_cache_bytes = 0;
int64_t local_io_time = 0;
int64_t remote_io_time = 0;
int64_t write_cache_io_time = 0;
};

public:
bool rpc_struct_inited = false;
Expand All @@ -68,6 +89,10 @@ struct MaterializationSharedState {
uint32_t _max_rows_per_backend = 0;
// Store the number of rows processed by each backend
std::unordered_map<int64_t, uint32_t> _backend_rows_count; // backend_id => rows_count

private:
// backend id => accumulated TopN phase-2 profile stats.
std::map<int64_t, TopNLazyMaterializationBackendStats> _topn_lazy_materialization_backend_stats;
};

class MaterializationLocalState final : public PipelineXLocalState<FakeSharedState> {
Expand Down
Loading
Loading