From 4c29fa99fe44e666e5597e377f03b98b1db4bfe4 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 16 Mar 2026 20:20:03 +0800 Subject: [PATCH 1/3] [Enhancement](compaction) add information_schema.be_compaction_tasks system table (#48893) Add a new system table `be_compaction_tasks` in `information_schema` that exposes compaction task metadata across all BEs, covering PENDING, RUNNING, FINISHED, and FAILED states. Key components: - CompactionTaskTracker: singleton that tracks compaction tasks across their full lifecycle (PENDING -> RUNNING -> FINISHED/FAILED) - SchemaCompactionTasksScanner: BE scanner that fills 30 columns including identification, timing, input/output stats, IO stats, and resource usage - FE schema registration with BackendPartitionedSchemaScanNode for multi-BE fan-out Tracker is integrated at all compaction entry points: - Local: base/cumu/full, single-replica, cold-data, manual HTTP trigger - Cloud: base/cumu/full, manual HTTP trigger, index-change Closes #48893 --- be/src/cloud/cloud_base_compaction.h | 2 + be/src/cloud/cloud_compaction_action.cpp | 4 +- be/src/cloud/cloud_cumulative_compaction.h | 2 + be/src/cloud/cloud_full_compaction.h | 2 + be/src/cloud/cloud_index_change_compaction.h | 3 + be/src/cloud/cloud_storage_engine.cpp | 130 +++++- be/src/cloud/cloud_storage_engine.h | 12 +- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + .../schema_compaction_tasks_scanner.cpp | 418 ++++++++++++++++++ .../schema_compaction_tasks_scanner.h | 51 +++ be/src/information_schema/schema_scanner.cpp | 3 + .../service/http/action/compaction_action.cpp | 38 +- be/src/storage/compaction/base_compaction.h | 1 + .../storage/compaction/cold_data_compaction.h | 1 + be/src/storage/compaction/compaction.cpp | 109 ++++- be/src/storage/compaction/compaction.h | 19 + .../compaction/compaction_task_tracker.cpp | 231 ++++++++++ .../compaction/compaction_task_tracker.h | 163 +++++++ .../compaction/cumulative_compaction.h | 1 + be/src/storage/compaction/full_compaction.h | 1 + .../compaction/single_replica_compaction.h | 1 + be/src/storage/olap_server.cpp | 109 ++++- be/src/storage/storage_engine.h | 7 +- .../task/engine_cloud_index_change_task.cpp | 28 ++ be/test/cloud/cloud_compaction_test.cpp | 2 + .../compaction_task_tracker_test.cpp | 413 +++++++++++++++++ .../doris/analysis/SchemaTableType.java | 2 + .../org/apache/doris/catalog/SchemaTable.java | 32 ++ .../BackendPartitionedSchemaScanNode.java | 2 + gensrc/thrift/Descriptors.thrift | 1 + .../test_be_compaction_tasks.groovy | 110 +++++ 32 files changed, 1860 insertions(+), 42 deletions(-) create mode 100644 be/src/information_schema/schema_compaction_tasks_scanner.cpp create mode 100644 be/src/information_schema/schema_compaction_tasks_scanner.h create mode 100644 be/src/storage/compaction/compaction_task_tracker.cpp create mode 100644 be/src/storage/compaction/compaction_task_tracker.h create mode 100644 be/test/storage/compaction/compaction_task_tracker_test.cpp create mode 100644 regression-test/suites/compaction/test_be_compaction_tasks.groovy diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index c89047b919beb3..cbb4d0516ad256 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -35,11 +35,13 @@ class CloudBaseCompaction : public CloudCompactionMixin { Status request_global_lock(); void do_lease(); + CompactionProfileType profile_type() const override { return CompactionProfileType::BASE; } private: Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "CloudBaseCompaction"; } + int64_t input_segments_num() const override { return _input_segments; } Status modify_rowsets() override; diff --git a/be/src/cloud/cloud_compaction_action.cpp b/be/src/cloud/cloud_compaction_action.cpp index d9a7794edca785..0be27c373208d9 100644 --- a/be/src/cloud/cloud_compaction_action.cpp +++ b/be/src/cloud/cloud_compaction_action.cpp @@ -30,7 +30,6 @@ #include "absl/strings/substitute.h" #include "cloud/cloud_base_compaction.h" -#include "cloud/cloud_compaction_action.h" #include "cloud/cloud_cumulative_compaction.h" #include "cloud/cloud_full_compaction.h" #include "cloud/cloud_tablet.h" @@ -171,7 +170,8 @@ Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::stri tablet, compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION : compaction_type == PARAM_COMPACTION_CUMULATIVE ? CompactionType::CUMULATIVE_COMPACTION - : CompactionType::FULL_COMPACTION)); + : CompactionType::FULL_COMPACTION, + TriggerMethod::MANUAL)); LOG(INFO) << "Manual compaction task is successfully triggered, tablet id: " << tablet_id << " table id: " << table_id; diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 174d0d57a97cc7..1fc3da8ccb89f3 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -40,11 +40,13 @@ class CloudCumulativeCompaction : public CloudCompactionMixin { int64_t get_input_rowsets_bytes() const { return _input_rowsets_total_size; } int64_t get_input_num_rows() const { return _input_row_num; } + CompactionProfileType profile_type() const override { return CompactionProfileType::CUMULATIVE; } private: Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "CloudCumulativeCompaction"; } + int64_t input_segments_num() const override { return _input_segments; } Status modify_rowsets() override; diff --git a/be/src/cloud/cloud_full_compaction.h b/be/src/cloud/cloud_full_compaction.h index e5c440e52b9b8a..04b471fd13d10e 100644 --- a/be/src/cloud/cloud_full_compaction.h +++ b/be/src/cloud/cloud_full_compaction.h @@ -36,11 +36,13 @@ class CloudFullCompaction : public CloudCompactionMixin { Status request_global_lock(); void do_lease(); + CompactionProfileType profile_type() const override { return CompactionProfileType::FULL; } protected: Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "CloudFullCompaction"; } + int64_t input_segments_num() const override { return _input_segments; } Status modify_rowsets() override; Status garbage_collection() override; diff --git a/be/src/cloud/cloud_index_change_compaction.h b/be/src/cloud/cloud_index_change_compaction.h index e0bd7952ca7568..aa87b2f4ba03e1 100644 --- a/be/src/cloud/cloud_index_change_compaction.h +++ b/be/src/cloud/cloud_index_change_compaction.h @@ -45,6 +45,8 @@ class CloudIndexChangeCompaction : public CloudCompactionMixin { bool is_base_compaction() const { return _compact_type == cloud::TabletCompactionJobPB::BASE; } + CompactionProfileType profile_type() const override { return CompactionProfileType::INDEX_CHANGE; } + Status rebuild_tablet_schema() override; private: @@ -55,6 +57,7 @@ class CloudIndexChangeCompaction : public CloudCompactionMixin { protected: std::string_view compaction_name() const override { return "CloudIndexChangeCompaction"; } + int64_t input_segments_num() const override { return _input_segments; } // if cumu rowset is modified, cumu compaction should sync rowset before execute. // if base rowset is modified, base compaction should sync rowset before execute. diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 432c1fde72fbc5..9d84d5a0354e30 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -56,6 +56,7 @@ #include "io/hdfs_util.h" #include "io/io_common.h" #include "load/memtable/memtable_flush_executor.h" +#include "service/backend_options.h" #include "runtime/memory/cache_manager.h" #include "storage/compaction/cumulative_compaction_policy.h" #include "storage/compaction/cumulative_compaction_time_series_policy.h" @@ -793,7 +794,8 @@ Status CloudStorageEngine::_request_tablet_global_compaction_lock( } } -Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) { +Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet, + TriggerMethod trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); @@ -816,6 +818,29 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t _submitted_base_compactions.erase(tablet->tablet_id()); return st; } + + // Register task as PENDING in the tracker after successful prepare + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.backend_id = BackendOptions::get_backend_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.tablet_id = tablet->tablet_id(); + info.compaction_type = compaction->profile_type(); + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = trigger_method; + info.compaction_score = tablet->get_cloud_base_compaction_score(); + info.scheduled_time_ms = UnixMillis(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + info.input_version_range = compaction->input_version_range_str(); + CompactionTaskTracker::instance()->register_task(std::move(info)); + } + { std::lock_guard lock(_compaction_mtx); _submitted_base_compactions[tablet->tablet_id()] = compaction; @@ -836,13 +861,24 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t }}; auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet, compaction); - if (!st.ok()) return; + if (!st.ok()) { + CompactionTaskTracker::instance()->remove_task(compaction_id); + return; + } + // Update tracker to RUNNING when execution starts + { + RunningStats stats; + stats.start_time_ms = UnixMillis(); + stats.is_vertical = compaction->is_vertical(); + CompactionTaskTracker::instance()->update_to_running(compaction_id, stats); + } st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` long now = duration_cast(system_clock::now().time_since_epoch()).count(); tablet->set_last_base_compaction_failure_time(now); } + // submit_profile_record() inside execute_compact() handles complete/fail in tracker std::lock_guard lock(_compaction_mtx); _executing_base_compactions.erase(tablet->tablet_id()); }); @@ -851,13 +887,15 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t if (!st.ok()) { std::lock_guard lock(_compaction_mtx); _submitted_base_compactions.erase(tablet->tablet_id()); + CompactionTaskTracker::instance()->remove_task(compaction_id); return Status::InternalError("failed to submit base compaction, tablet_id={}", tablet->tablet_id()); } return st; } -Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) { +Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet, + TriggerMethod trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); @@ -890,6 +928,29 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); return st; } + + // Register task as PENDING in the tracker after successful prepare + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.backend_id = BackendOptions::get_backend_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.tablet_id = tablet->tablet_id(); + info.compaction_type = compaction->profile_type(); + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = trigger_method; + info.compaction_score = tablet->get_cloud_cumu_compaction_score(); + info.scheduled_time_ms = UnixMillis(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + info.input_version_range = compaction->input_version_range_str(); + CompactionTaskTracker::instance()->register_task(std::move(info)); + } + { std::lock_guard lock(_compaction_mtx); _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); @@ -952,7 +1013,17 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS }}; auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION, tablet, compaction); - if (!st.ok()) return; + if (!st.ok()) { + CompactionTaskTracker::instance()->remove_task(compaction_id); + return; + } + // Update tracker to RUNNING when execution starts + { + RunningStats stats; + stats.start_time_ms = UnixMillis(); + stats.is_vertical = compaction->is_vertical(); + CompactionTaskTracker::instance()->update_to_running(compaction_id, stats); + } do { std::lock_guard lock(_cumu_compaction_delay_mtx); _cumu_compaction_thread_pool_used_threads++; @@ -989,6 +1060,7 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS .tag("remaining threads", _cumu_compaction_thread_pool_used_threads) .tag("small_tasks_running", _cumu_compaction_thread_pool_small_tasks_running); + CompactionTaskTracker::instance()->remove_task(compaction_id); return; } } @@ -1005,13 +1077,15 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS _cumu_compaction_thread_pool->get_queue_size()); if (!st.ok()) { erase_submitted_cumu_compaction(); + CompactionTaskTracker::instance()->remove_task(compaction_id); return Status::InternalError("failed to submit cumu compaction, tablet_id={}", tablet->tablet_id()); } return st; } -Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) { +Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet, + TriggerMethod trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); @@ -1033,6 +1107,29 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t _submitted_full_compactions.erase(tablet->tablet_id()); return st; } + + // Register task as PENDING in the tracker after successful prepare + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.backend_id = BackendOptions::get_backend_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.tablet_id = tablet->tablet_id(); + info.compaction_type = compaction->profile_type(); + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = trigger_method; + info.compaction_score = tablet->get_cloud_base_compaction_score(); + info.scheduled_time_ms = UnixMillis(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + info.input_version_range = compaction->input_version_range_str(); + CompactionTaskTracker::instance()->register_task(std::move(info)); + } + { std::lock_guard lock(_compaction_mtx); _submitted_full_compactions[tablet->tablet_id()] = compaction; @@ -1047,19 +1144,31 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t }}; auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet, compaction); - if (!st.ok()) return; + if (!st.ok()) { + CompactionTaskTracker::instance()->remove_task(compaction_id); + return; + } + // Update tracker to RUNNING when execution starts + { + RunningStats stats; + stats.start_time_ms = UnixMillis(); + stats.is_vertical = compaction->is_vertical(); + CompactionTaskTracker::instance()->update_to_running(compaction_id, stats); + } st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` long now = duration_cast(system_clock::now().time_since_epoch()).count(); tablet->set_last_full_compaction_failure_time(now); } + // submit_profile_record() inside execute_compact() handles complete/fail in tracker std::lock_guard lock(_compaction_mtx); _executing_full_compactions.erase(tablet->tablet_id()); }); if (!st.ok()) { std::lock_guard lock(_compaction_mtx); _submitted_full_compactions.erase(tablet->tablet_id()); + CompactionTaskTracker::instance()->remove_task(compaction_id); return Status::InternalError("failed to submit full compaction, tablet_id={}", tablet->tablet_id()); } @@ -1067,19 +1176,20 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t } Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet, - CompactionType compaction_type) { + CompactionType compaction_type, + TriggerMethod trigger_method) { DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION || compaction_type == CompactionType::BASE_COMPACTION || compaction_type == CompactionType::FULL_COMPACTION); switch (compaction_type) { case CompactionType::BASE_COMPACTION: - RETURN_IF_ERROR(_submit_base_compaction_task(tablet)); + RETURN_IF_ERROR(_submit_base_compaction_task(tablet, trigger_method)); return Status::OK(); case CompactionType::CUMULATIVE_COMPACTION: - RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet)); + RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet, trigger_method)); return Status::OK(); case CompactionType::FULL_COMPACTION: - RETURN_IF_ERROR(_submit_full_compaction_task(tablet)); + RETURN_IF_ERROR(_submit_full_compaction_task(tablet, trigger_method)); return Status::OK(); default: return Status::InternalError("unknown compaction type!"); diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 7d0a3e6129645f..20e90142a2ca98 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -133,7 +133,8 @@ class CloudStorageEngine final : public BaseStorageEngine { void get_cumu_compaction(int64_t tablet_id, std::vector>& res); - Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type); + Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type, + TriggerMethod trigger_method = TriggerMethod::BACKGROUND); Status get_compaction_status_json(std::string* result); @@ -199,9 +200,12 @@ class CloudStorageEngine final : public BaseStorageEngine { std::vector _generate_cloud_compaction_tasks(CompactionType compaction_type, bool check_score); Status _adjust_compaction_thread_num(); - Status _submit_base_compaction_task(const CloudTabletSPtr& tablet); - Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet); - Status _submit_full_compaction_task(const CloudTabletSPtr& tablet); + Status _submit_base_compaction_task(const CloudTabletSPtr& tablet, + TriggerMethod trigger_method); + Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet, + TriggerMethod trigger_method); + Status _submit_full_compaction_task(const CloudTabletSPtr& tablet, + TriggerMethod trigger_method); Status _request_tablet_global_compaction_lock(ReaderType compaction_type, const CloudTabletSPtr& tablet, std::shared_ptr compaction); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c28f1f8c33d641..3b1141805f34c1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -436,6 +436,8 @@ DEFINE_Bool(enable_low_cardinality_cache_code, "true"); DEFINE_mBool(enable_compaction_checksum, "false"); // whether disable automatic compaction task DEFINE_mBool(disable_auto_compaction, "false"); +// max number of compaction task records to keep in memory, 0 to disable +DEFINE_mInt32(compaction_profile_max_records, "500"); // whether enable vertical compaction DEFINE_mBool(enable_vertical_compaction, "true"); // whether enable ordered data compaction diff --git a/be/src/common/config.h b/be/src/common/config.h index bb122a25598c00..97ad4e1ad5d71e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -501,6 +501,8 @@ DECLARE_Bool(enable_low_cardinality_cache_code); DECLARE_mBool(enable_compaction_checksum); // whether disable automatic compaction task DECLARE_mBool(disable_auto_compaction); +// max number of compaction task records to keep in memory, 0 to disable +DECLARE_mInt32(compaction_profile_max_records); // whether enable vertical compaction DECLARE_mBool(enable_vertical_compaction); // whether enable ordered data compaction diff --git a/be/src/information_schema/schema_compaction_tasks_scanner.cpp b/be/src/information_schema/schema_compaction_tasks_scanner.cpp new file mode 100644 index 00000000000000..57e47ce53bd664 --- /dev/null +++ b/be/src/information_schema/schema_compaction_tasks_scanner.cpp @@ -0,0 +1,418 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "information_schema/schema_compaction_tasks_scanner.h" + +#include + +#include +#include +#include + +#include "common/status.h" +#include "core/data_type/define_primitive_type.h" +#include "core/string_ref.h" +#include "runtime/runtime_state.h" +#include "service/backend_options.h" +#include "storage/compaction/compaction_task_tracker.h" +#include "util/time.h" + +namespace doris { +class Block; + +#include "common/compile_check_begin.h" + +std::vector SchemaCompactionTasksScanner::_s_tbls_columns = { + // name, type, size, is_null + {"BACKEND_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"COMPACTION_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"TABLE_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"PARTITION_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"TABLET_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"COMPACTION_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"STATUS", TYPE_VARCHAR, sizeof(StringRef), true}, + {"TRIGGER_METHOD", TYPE_VARCHAR, sizeof(StringRef), true}, + {"COMPACTION_SCORE", TYPE_BIGINT, sizeof(int64_t), true}, + {"SCHEDULED_TIME", TYPE_DATETIME, sizeof(int64_t), true}, + {"START_TIME", TYPE_DATETIME, sizeof(int64_t), true}, + {"END_TIME", TYPE_DATETIME, sizeof(int64_t), true}, + {"ELAPSED_TIME_MS", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_ROWSETS_COUNT", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_ROW_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_DATA_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_SEGMENTS_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_VERSION_RANGE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"MERGED_ROWS", TYPE_BIGINT, sizeof(int64_t), true}, + {"FILTERED_ROWS", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_ROW_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_DATA_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_SEGMENTS_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_VERSION", TYPE_VARCHAR, sizeof(StringRef), true}, + {"BYTES_READ_FROM_LOCAL", TYPE_BIGINT, sizeof(int64_t), true}, + {"BYTES_READ_FROM_REMOTE", TYPE_BIGINT, sizeof(int64_t), true}, + {"PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), true}, + {"IS_VERTICAL", TYPE_BOOLEAN, sizeof(bool), true}, + {"PERMITS", TYPE_BIGINT, sizeof(int64_t), true}, + {"STATUS_MSG", TYPE_VARCHAR, sizeof(StringRef), true}, +}; + +SchemaCompactionTasksScanner::SchemaCompactionTasksScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_BE_COMPACTION_TASKS), + backend_id_(0), + _tasks_idx(0) {}; + +Status SchemaCompactionTasksScanner::start(RuntimeState* state) { + if (!_is_init) { + return Status::InternalError("used before initialized."); + } + backend_id_ = state->backend_id(); + _tasks = CompactionTaskTracker::instance()->get_all_tasks(); + return Status::OK(); +} + +Status SchemaCompactionTasksScanner::get_next_block_internal(Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_tasks_idx >= _tasks.size()) { + *eos = true; + return Status::OK(); + } + *eos = false; + return _fill_block_impl(block); +} + +Status SchemaCompactionTasksScanner::_fill_block_impl(Block* block) { + SCOPED_TIMER(_fill_block_timer); + size_t fill_tasks_num = std::min(1000UL, _tasks.size() - _tasks_idx); + size_t fill_idx_begin = _tasks_idx; + size_t fill_idx_end = _tasks_idx + fill_tasks_num; + std::vector datas(fill_tasks_num); + + // col 0: BACKEND_ID + { + int64_t src = backend_id_; + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + datas[i - fill_idx_begin] = &src; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 0, datas)); + } + // col 1: COMPACTION_ID + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].compaction_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 1, datas)); + } + // col 2: TABLE_ID + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].table_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 2, datas)); + } + // col 3: PARTITION_ID + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].partition_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 3, datas)); + } + // col 4: TABLET_ID + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].tablet_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 4, datas)); + } + // col 5: COMPACTION_TYPE + { + std::vector strs_storage(fill_tasks_num); + std::vector strs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs_storage[i - fill_idx_begin] = to_string(_tasks[i].compaction_type); + strs[i - fill_idx_begin] = StringRef(strs_storage[i - fill_idx_begin].c_str(), + strs_storage[i - fill_idx_begin].size()); + datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 5, datas)); + } + // col 6: STATUS + { + std::vector strs_storage(fill_tasks_num); + std::vector strs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs_storage[i - fill_idx_begin] = to_string(_tasks[i].status); + strs[i - fill_idx_begin] = StringRef(strs_storage[i - fill_idx_begin].c_str(), + strs_storage[i - fill_idx_begin].size()); + datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 6, datas)); + } + // col 7: TRIGGER_METHOD + { + std::vector strs_storage(fill_tasks_num); + std::vector strs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs_storage[i - fill_idx_begin] = to_string(_tasks[i].trigger_method); + strs[i - fill_idx_begin] = StringRef(strs_storage[i - fill_idx_begin].c_str(), + strs_storage[i - fill_idx_begin].size()); + datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 7, datas)); + } + // col 8: COMPACTION_SCORE + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].compaction_score; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 8, datas)); + } + // col 9: SCHEDULED_TIME + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + int64_t ts_ms = _tasks[i].scheduled_time_ms; + if (ts_ms > 0) { + srcs[i - fill_idx_begin].from_unixtime(ts_ms / 1000, _timezone_obj); + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } else { + datas[i - fill_idx_begin] = nullptr; + } + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 9, datas)); + } + // col 10: START_TIME + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + int64_t ts_ms = _tasks[i].start_time_ms; + if (ts_ms > 0) { + srcs[i - fill_idx_begin].from_unixtime(ts_ms / 1000, _timezone_obj); + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } else { + datas[i - fill_idx_begin] = nullptr; + } + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, datas)); + } + // col 11: END_TIME + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + int64_t ts_ms = _tasks[i].end_time_ms; + if (ts_ms > 0) { + srcs[i - fill_idx_begin].from_unixtime(ts_ms / 1000, _timezone_obj); + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } else { + datas[i - fill_idx_begin] = nullptr; + } + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas)); + } + // col 12: ELAPSED_TIME_MS + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + const auto& task = _tasks[i]; + if (task.status == CompactionTaskStatus::RUNNING) { + srcs[i - fill_idx_begin] = UnixMillis() - task.start_time_ms; + } else if (task.status == CompactionTaskStatus::FINISHED || + task.status == CompactionTaskStatus::FAILED) { + srcs[i - fill_idx_begin] = task.end_time_ms - task.start_time_ms; + } else { + srcs[i - fill_idx_begin] = 0; + } + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas)); + } + // col 13: INPUT_ROWSETS_COUNT + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_rowsets_count; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas)); + } + // col 14: INPUT_ROW_NUM + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_row_num; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 14, datas)); + } + // col 15: INPUT_DATA_SIZE + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_data_size; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 15, datas)); + } + // col 16: INPUT_SEGMENTS_NUM + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_segments_num; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 16, datas)); + } + // col 17: INPUT_VERSION_RANGE + { + std::vector strs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs[i - fill_idx_begin] = StringRef(_tasks[i].input_version_range.c_str(), + _tasks[i].input_version_range.size()); + datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, datas)); + } + // col 18: MERGED_ROWS + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].merged_rows; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 18, datas)); + } + // col 19: FILTERED_ROWS + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].filtered_rows; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 19, datas)); + } + // col 20: OUTPUT_ROW_NUM + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_row_num; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 20, datas)); + } + // col 21: OUTPUT_DATA_SIZE + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_data_size; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 21, datas)); + } + // col 22: OUTPUT_SEGMENTS_NUM + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_segments_num; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 22, datas)); + } + // col 23: OUTPUT_VERSION + { + std::vector strs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs[i - fill_idx_begin] = StringRef(_tasks[i].output_version.c_str(), + _tasks[i].output_version.size()); + datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 23, datas)); + } + // col 24: BYTES_READ_FROM_LOCAL + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].bytes_read_from_local; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 24, datas)); + } + // col 25: BYTES_READ_FROM_REMOTE + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].bytes_read_from_remote; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 25, datas)); + } + // col 26: PEAK_MEMORY_BYTES + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].peak_memory_bytes; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 26, datas)); + } + // col 27: IS_VERTICAL (use uint8_t because vector is bitset, data() is deleted) + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].is_vertical ? 1 : 0; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 27, datas)); + } + // col 28: PERMITS + { + std::vector srcs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].permits; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 28, datas)); + } + // col 29: STATUS_MSG + { + std::vector strs(fill_tasks_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs[i - fill_idx_begin] = StringRef(_tasks[i].status_msg.c_str(), + _tasks[i].status_msg.size()); + datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 29, datas)); + } + + _tasks_idx += fill_tasks_num; + return Status::OK(); +} +} // namespace doris diff --git a/be/src/information_schema/schema_compaction_tasks_scanner.h b/be/src/information_schema/schema_compaction_tasks_scanner.h new file mode 100644 index 00000000000000..418859539b133d --- /dev/null +++ b/be/src/information_schema/schema_compaction_tasks_scanner.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "common/status.h" +#include "information_schema/schema_scanner.h" +#include "storage/compaction/compaction_task_tracker.h" + +namespace doris { +class RuntimeState; + +class Block; + +class SchemaCompactionTasksScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaCompactionTasksScanner); + +public: + SchemaCompactionTasksScanner(); + ~SchemaCompactionTasksScanner() override = default; + + Status start(RuntimeState* state) override; + Status get_next_block_internal(Block* block, bool* eos) override; + +private: + Status _fill_block_impl(Block* block); + + static std::vector _s_tbls_columns; + int64_t backend_id_ = 0; + size_t _tasks_idx = 0; + std::vector _tasks; +}; +} // namespace doris diff --git a/be/src/information_schema/schema_scanner.cpp b/be/src/information_schema/schema_scanner.cpp index a03c40b28c1ed4..aeb0b69dc3348c 100644 --- a/be/src/information_schema/schema_scanner.cpp +++ b/be/src/information_schema/schema_scanner.cpp @@ -48,6 +48,7 @@ #include "information_schema/schema_backend_kerberos_ticket_cache.h" #include "information_schema/schema_catalog_meta_cache_stats_scanner.h" #include "information_schema/schema_charsets_scanner.h" +#include "information_schema/schema_compaction_tasks_scanner.h" #include "information_schema/schema_cluster_snapshot_properties_scanner.h" #include "information_schema/schema_cluster_snapshots_scanner.h" #include "information_schema/schema_collations_scanner.h" @@ -233,6 +234,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaBackendWorkloadGroupResourceUsage::create_unique(); case TSchemaTableType::SCH_TABLE_PROPERTIES: return SchemaTablePropertiesScanner::create_unique(); + case TSchemaTableType::SCH_BE_COMPACTION_TASKS: + return SchemaCompactionTasksScanner::create_unique(); case TSchemaTableType::SCH_DATABASE_PROPERTIES: return SchemaDatabasePropertiesScanner::create_unique(); case TSchemaTableType::SCH_FILE_CACHE_STATISTICS: diff --git a/be/src/service/http/action/compaction_action.cpp b/be/src/service/http/action/compaction_action.cpp index fc49a74954d902..40ba4cef48b5d2 100644 --- a/be/src/service/http/action/compaction_action.cpp +++ b/be/src/service/http/action/compaction_action.cpp @@ -32,11 +32,13 @@ #include "common/logging.h" #include "common/metrics/doris_metrics.h" #include "common/status.h" +#include "service/backend_options.h" #include "service/http/http_channel.h" #include "service/http/http_headers.h" #include "service/http/http_request.h" #include "service/http/http_status.h" #include "storage/compaction/base_compaction.h" +#include "storage/compaction/compaction_task_tracker.h" #include "storage/compaction/cumulative_compaction.h" #include "storage/compaction/cumulative_compaction_policy.h" #include "storage/compaction/cumulative_compaction_time_series_policy.h" @@ -46,6 +48,7 @@ #include "storage/storage_engine.h" #include "storage/tablet/tablet_manager.h" #include "util/stopwatch.hpp" +#include "util/time.h" namespace doris { using namespace ErrorCode; @@ -154,8 +157,9 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j [table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; }); for (const auto& tablet : tablet_vec) { tablet->set_last_full_compaction_schedule_time(UnixMillis()); - RETURN_IF_ERROR( - _engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, false)); + RETURN_IF_ERROR(_engine.submit_compaction_task( + tablet, CompactionType::FULL_COMPACTION, false, /*eager=*/true, + TriggerMethod::MANUAL)); } } else { // 2. fetch the tablet by tablet_id @@ -169,7 +173,8 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j } DBUG_EXECUTE_IF("CompactionAction._handle_run_compaction.submit_cumu_task", { RETURN_IF_ERROR(_engine.submit_compaction_task( - tablet, CompactionType::CUMULATIVE_COMPACTION, false)); + tablet, CompactionType::CUMULATIVE_COMPACTION, false, /*eager=*/true, + TriggerMethod::MANUAL)); LOG(INFO) << "Manual debug compaction task is successfully triggered"; *json_result = R"({"status": "Success", "msg": "debug compaction task is successfully triggered. Table id: )" + @@ -303,8 +308,33 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, tablet->set_cumulative_compaction_policy(cumulative_compaction_policy); } Status res = Status::OK(); - auto do_compact = [](Compaction& compaction) { + // Helper to register a compaction task as RUNNING in the tracker (direct execution, MANUAL trigger) + auto register_running_task = [&tablet](Compaction& compaction) { + CompactionTaskInfo info; + info.compaction_id = compaction.compaction_id(); + info.backend_id = BackendOptions::get_backend_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.tablet_id = tablet->tablet_id(); + info.compaction_type = compaction.profile_type(); + info.status = CompactionTaskStatus::RUNNING; + info.trigger_method = TriggerMethod::MANUAL; + info.compaction_score = tablet->get_compaction_score(); + info.scheduled_time_ms = UnixMillis(); + info.start_time_ms = UnixMillis(); + info.input_rowsets_count = compaction.input_rowsets_count(); + info.input_row_num = compaction.input_row_num(); + info.input_data_size = compaction.input_rowsets_data_size(); + info.input_segments_num = compaction.input_segments_num_value(); + info.input_version_range = compaction.input_version_range_str(); + info.is_vertical = compaction.is_vertical(); + CompactionTaskTracker::instance()->register_task(std::move(info)); + }; + auto do_compact = [®ister_running_task](Compaction& compaction) { RETURN_IF_ERROR(compaction.prepare_compact()); + register_running_task(compaction); + // submit_profile_record() inside execute_compact() handles both + // success (complete) and failure (fail) tracker updates. return compaction.execute_compact(); }; if (compaction_type == PARAM_COMPACTION_BASE) { diff --git a/be/src/storage/compaction/base_compaction.h b/be/src/storage/compaction/base_compaction.h index 453583f8227abf..c01fc5e16e154c 100644 --- a/be/src/storage/compaction/base_compaction.h +++ b/be/src/storage/compaction/base_compaction.h @@ -43,6 +43,7 @@ class BaseCompaction final : public CompactionMixin { private: Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "base compaction"; } + CompactionProfileType profile_type() const override { return CompactionProfileType::BASE; } ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; } diff --git a/be/src/storage/compaction/cold_data_compaction.h b/be/src/storage/compaction/cold_data_compaction.h index 94ee993a3060d5..1332b3e17239d6 100644 --- a/be/src/storage/compaction/cold_data_compaction.h +++ b/be/src/storage/compaction/cold_data_compaction.h @@ -32,6 +32,7 @@ class ColdDataCompaction final : public CompactionMixin { Status prepare_compact() override; Status execute_compact() override; + CompactionProfileType profile_type() const override { return CompactionProfileType::COLD_DATA; } private: std::string_view compaction_name() const override { return "cold data compaction"; } diff --git a/be/src/storage/compaction/compaction.cpp b/be/src/storage/compaction/compaction.cpp index 698c81f7849fd0..e6af7a1ea5f132 100644 --- a/be/src/storage/compaction/compaction.cpp +++ b/be/src/storage/compaction/compaction.cpp @@ -17,6 +17,8 @@ #include "storage/compaction/compaction.h" +#include "storage/compaction/compaction_task_tracker.h" + #include #include #include @@ -157,6 +159,7 @@ Compaction::Compaction(BaseTabletSPtr tablet, const std::string& label) : _mem_tracker( MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, label)), _tablet(std::move(tablet)), + _compaction_id(CompactionTaskTracker::instance()->next_compaction_id()), _is_vertical(config::enable_vertical_compaction), _allow_delete_in_cumu_compaction(config::enable_delete_when_cumu_compaction), _enable_vertical_compact_variant_subcolumns( @@ -177,6 +180,45 @@ Compaction::~Compaction() { _rowid_conversion.reset(); } +void Compaction::submit_profile_record(bool success, int64_t start_time_ms, + const std::string& status_msg) { + CompletionStats stats; + // Fill input_version_range from input rowsets + if (!_input_rowsets.empty()) { + stats.input_version_range = + fmt::format("[{}-{}]", _input_rowsets.front()->start_version(), + _input_rowsets.back()->end_version()); + } + stats.end_time_ms = UnixMillis(); + stats.merged_rows = _stats.merged_rows; + stats.filtered_rows = _stats.filtered_rows; + stats.bytes_read_from_local = _stats.bytes_read_from_local; + stats.bytes_read_from_remote = _stats.bytes_read_from_remote; + stats.peak_memory_bytes = _mem_tracker ? _mem_tracker->peak_consumption() : 0; + + if (_output_rowset) { + stats.output_row_num = _output_rowset->num_rows(); + stats.output_data_size = _output_rowset->data_disk_size(); + stats.output_segments_num = _output_rowset->num_segments(); + stats.output_version = _output_version.to_string(); + } + + auto* tracker = CompactionTaskTracker::instance(); + if (success) { + tracker->complete(_compaction_id, stats); + } else { + tracker->fail(_compaction_id, stats, status_msg); + } +} + +std::string Compaction::input_version_range_str() const { + if (_input_rowsets.empty()) { + return ""; + } + return fmt::format("[{}-{}]", _input_rowsets.front()->start_version(), + _input_rowsets.back()->end_version()); +} + void Compaction::init_profile(const std::string& label) { _profile = std::make_unique(label); @@ -533,13 +575,18 @@ bool CompactionMixin::handle_ordered_data_compaction() { } Status CompactionMixin::execute_compact() { + int64_t profile_start_time_ms = UnixMillis(); uint32_t checksum_before; uint32_t checksum_after; bool enable_compaction_checksum = config::enable_compaction_checksum; if (enable_compaction_checksum) { EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), _input_rowsets.back()->end_version(), &checksum_before); - RETURN_IF_ERROR(checksum_task.execute()); + auto checksum_before_st = checksum_task.execute(); + if (!checksum_before_st.ok()) { + submit_profile_record(false, profile_start_time_ms, checksum_before_st.to_string()); + return checksum_before_st; + } } auto* data_dir = tablet()->data_dir(); @@ -547,26 +594,55 @@ Status CompactionMixin::execute_compact() { data_dir->disks_compaction_score_increment(permits); data_dir->disks_compaction_num_increment(1); - auto record_compaction_stats = [&](const doris::Exception& ex) { + // Use a wrapper to capture the actual Status message on failure. + // HANDLE_EXCEPTION_IF_CATCH_EXCEPTION returns early on failure (non-OK or exception). + // For non-OK Status, the macro passes a default-constructed Exception with empty what(). + // For thrown exceptions, ex.what() has the real message. + Status impl_status; + auto on_failure = [&](const doris::Exception& ex) { _tablet->compaction_count.fetch_add(1, std::memory_order_relaxed); data_dir->disks_compaction_score_increment(-permits); data_dir->disks_compaction_num_increment(-1); }; - HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(execute_compact_impl(permits), record_compaction_stats); - record_compaction_stats(doris::Exception()); + HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( + ({ + impl_status = execute_compact_impl(permits); + impl_status; + }), + ([&](const doris::Exception& ex) { + on_failure(ex); + // Use the captured Status message if Exception has no message + std::string msg = ex.what(); + if (msg.empty() && !impl_status.ok()) { + msg = impl_status.to_string(); + } + submit_profile_record(false, profile_start_time_ms, msg); + })); + // Only reached on success + _tablet->compaction_count.fetch_add(1, std::memory_order_relaxed); + data_dir->disks_compaction_score_increment(-permits); + data_dir->disks_compaction_num_increment(-1); if (enable_compaction_checksum) { EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), _input_rowsets.back()->end_version(), &checksum_after); - RETURN_IF_ERROR(checksum_task.execute()); + auto checksum_st = checksum_task.execute(); + if (!checksum_st.ok()) { + submit_profile_record(false, profile_start_time_ms, checksum_st.to_string()); + return checksum_st; + } if (checksum_before != checksum_after) { - return Status::InternalError( + auto st = Status::InternalError( "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", checksum_before, checksum_after, _tablet->tablet_id()); + submit_profile_record(false, profile_start_time_ms, st.to_string()); + return st; } } + submit_profile_record(true, profile_start_time_ms); + DorisMetrics::instance()->local_compaction_read_rows_total->increment(_input_row_num); DorisMetrics::instance()->local_compaction_read_bytes_total->increment( _input_rowsets_total_size); @@ -1671,22 +1747,31 @@ size_t CloudCompactionMixin::apply_txn_size_truncation_and_log(const std::string } Status CloudCompactionMixin::execute_compact() { + int64_t profile_start_time_ms = UnixMillis(); TEST_INJECTION_POINT("Compaction::do_compaction"); int64_t permits = get_compaction_permits(); + Status impl_status; HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( - execute_compact_impl(permits), [&](const doris::Exception& ex) { + ({ + impl_status = execute_compact_impl(permits); + impl_status; + }), + ([&](const doris::Exception& ex) { auto st = garbage_collection(); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write() && !st.ok()) { - // if compaction fail, be will try to abort compaction, and delete bitmap lock - // will release if abort job successfully, but if abort failed, delete bitmap - // lock will not release, in this situation, be need to send this rpc to ms - // to try to release delete bitmap lock. _engine.meta_mgr().remove_delete_bitmap_update_lock( _tablet->table_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(), _tablet->tablet_id()); } - }); + std::string msg = ex.what(); + if (msg.empty() && !impl_status.ok()) { + msg = impl_status.to_string(); + } + submit_profile_record(false, profile_start_time_ms, msg); + })); + // Only reached on success + submit_profile_record(true, profile_start_time_ms); DorisMetrics::instance()->remote_compaction_read_rows_total->increment(_input_row_num); DorisMetrics::instance()->remote_compaction_write_rows_total->increment( diff --git a/be/src/storage/compaction/compaction.h b/be/src/storage/compaction/compaction.h index 772c8b30aa4172..35fc6efec24297 100644 --- a/be/src/storage/compaction/compaction.h +++ b/be/src/storage/compaction/compaction.h @@ -39,6 +39,7 @@ #include "storage/olap_common.h" #include "storage/rowid_conversion.h" #include "storage/rowset/pending_rowset_helper.h" +#include "storage/compaction/compaction_task_tracker.h" #include "storage/rowset/rowset_fwd.h" #include "storage/tablet/tablet_fwd.h" @@ -74,6 +75,17 @@ class Compaction { virtual ReaderType compaction_type() const = 0; virtual std::string_view compaction_name() const = 0; + virtual CompactionProfileType profile_type() const = 0; + + int64_t compaction_id() const { return _compaction_id; } + + // Public getters for tracker integration + int64_t input_rowsets_data_size() const { return _input_rowsets_data_size; } + int64_t input_row_num() const { return _input_row_num; } + int64_t input_rowsets_count() const { return static_cast(_input_rowsets.size()); } + int64_t input_segments_num_value() const { return _input_num_segments; } + bool is_vertical() const { return _is_vertical; } + std::string input_version_range_str() const; // the difference between index change compmaction and other compaction. // 1. delete predicate should be kept when input is cumu rowset. @@ -85,6 +97,11 @@ class Compaction { void set_delete_predicate_for_output_rowset(); protected: + void submit_profile_record(bool success, int64_t start_time_ms, + const std::string& status_msg = ""); + + virtual int64_t input_segments_num() const { return _input_num_segments; } + Status merge_input_rowsets(); // merge inverted index files @@ -115,6 +132,8 @@ class Compaction { BaseTabletSPtr _tablet; + int64_t _compaction_id {0}; + std::vector _input_rowsets; int64_t _input_rowsets_data_size {0}; int64_t _input_rowsets_index_size {0}; diff --git a/be/src/storage/compaction/compaction_task_tracker.cpp b/be/src/storage/compaction/compaction_task_tracker.cpp new file mode 100644 index 00000000000000..7b132e5d4b7e2a --- /dev/null +++ b/be/src/storage/compaction/compaction_task_tracker.cpp @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/compaction/compaction_task_tracker.h" + +#include "common/config.h" + +namespace doris { + +const char* to_string(CompactionTaskStatus status) { + switch (status) { + case CompactionTaskStatus::PENDING: + return "PENDING"; + case CompactionTaskStatus::RUNNING: + return "RUNNING"; + case CompactionTaskStatus::FINISHED: + return "FINISHED"; + case CompactionTaskStatus::FAILED: + return "FAILED"; + } + return "UNKNOWN"; +} + +const char* to_string(CompactionProfileType type) { + switch (type) { + case CompactionProfileType::BASE: + return "base"; + case CompactionProfileType::CUMULATIVE: + return "cumulative"; + case CompactionProfileType::FULL: + return "full"; + case CompactionProfileType::SINGLE_REPLICA: + return "single_replica"; + case CompactionProfileType::COLD_DATA: + return "cold_data"; + case CompactionProfileType::INDEX_CHANGE: + return "index_change"; + } + return "unknown"; +} + +const char* to_string(TriggerMethod method) { + switch (method) { + case TriggerMethod::MANUAL: + return "MANUAL"; + case TriggerMethod::BACKGROUND: + return "BACKGROUND"; + } + return "UNKNOWN"; +} + +CompactionTaskTracker* CompactionTaskTracker::instance() { + static CompactionTaskTracker s_instance; + return &s_instance; +} + +void CompactionTaskTracker::register_task(CompactionTaskInfo info) { + std::unique_lock lock(_mutex); + _active_tasks[info.compaction_id] = std::move(info); +} + +void CompactionTaskTracker::update_to_running(int64_t compaction_id, const RunningStats& stats) { + std::unique_lock lock(_mutex); + auto it = _active_tasks.find(compaction_id); + if (it == _active_tasks.end()) { + return; + } + auto& info = it->second; + info.status = CompactionTaskStatus::RUNNING; + info.start_time_ms = stats.start_time_ms; + info.is_vertical = stats.is_vertical; + info.permits = stats.permits; +} + +void CompactionTaskTracker::_apply_completion(CompactionTaskInfo& info, + const CompletionStats& stats) { + // Backfill input_version_range if not already set by entry point + if (info.input_version_range.empty() && !stats.input_version_range.empty()) { + info.input_version_range = stats.input_version_range; + } + info.end_time_ms = stats.end_time_ms; + info.merged_rows = stats.merged_rows; + info.filtered_rows = stats.filtered_rows; + info.output_row_num = stats.output_row_num; + info.output_data_size = stats.output_data_size; + info.output_segments_num = stats.output_segments_num; + info.output_version = stats.output_version; + info.bytes_read_from_local = stats.bytes_read_from_local; + info.bytes_read_from_remote = stats.bytes_read_from_remote; + info.peak_memory_bytes = stats.peak_memory_bytes; +} + +void CompactionTaskTracker::complete(int64_t compaction_id, const CompletionStats& stats) { + std::unique_lock lock(_mutex); + + int32_t max = config::compaction_profile_max_records; + if (max <= 0) { + _active_tasks.erase(compaction_id); + _trim_completed_locked(); + return; + } + + auto it = _active_tasks.find(compaction_id); + if (it != _active_tasks.end()) { + auto info = std::move(it->second); + _active_tasks.erase(it); + info.status = CompactionTaskStatus::FINISHED; + _apply_completion(info, stats); + _completed_tasks.push_back(std::move(info)); + } else { + // Fallback: entry point missed register_task(), create a degraded record + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.status = CompactionTaskStatus::FINISHED; + info.trigger_method = TriggerMethod::BACKGROUND; + info.scheduled_time_ms = stats.end_time_ms; // best-effort + _apply_completion(info, stats); + _completed_tasks.push_back(std::move(info)); + } + _trim_completed_locked(); +} + +void CompactionTaskTracker::fail(int64_t compaction_id, const CompletionStats& stats, + const std::string& msg) { + std::unique_lock lock(_mutex); + + int32_t max = config::compaction_profile_max_records; + if (max <= 0) { + _active_tasks.erase(compaction_id); + _trim_completed_locked(); + return; + } + + auto it = _active_tasks.find(compaction_id); + if (it != _active_tasks.end()) { + auto info = std::move(it->second); + _active_tasks.erase(it); + info.status = CompactionTaskStatus::FAILED; + info.status_msg = msg; + _apply_completion(info, stats); + _completed_tasks.push_back(std::move(info)); + } else { + // Fallback: degraded record + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.status = CompactionTaskStatus::FAILED; + info.status_msg = msg; + info.trigger_method = TriggerMethod::BACKGROUND; + info.scheduled_time_ms = stats.end_time_ms; + _apply_completion(info, stats); + _completed_tasks.push_back(std::move(info)); + } + _trim_completed_locked(); +} + +void CompactionTaskTracker::remove_task(int64_t compaction_id) { + std::unique_lock lock(_mutex); + _active_tasks.erase(compaction_id); +} + +void CompactionTaskTracker::clear_for_test() { + std::unique_lock lock(_mutex); + _active_tasks.clear(); + _completed_tasks.clear(); +} + +void CompactionTaskTracker::_trim_completed_locked() { + int32_t max = config::compaction_profile_max_records; + if (max <= 0) { + _completed_tasks.clear(); + return; + } + while (static_cast(_completed_tasks.size()) > max) { + _completed_tasks.pop_front(); + } +} + +std::vector CompactionTaskTracker::get_all_tasks() const { + std::shared_lock lock(_mutex); + std::vector result; + result.reserve(_active_tasks.size() + _completed_tasks.size()); + for (const auto& [_, info] : _active_tasks) { + result.push_back(info); + } + for (const auto& info : _completed_tasks) { + result.push_back(info); + } + return result; +} + +std::vector CompactionTaskTracker::get_completed_tasks(int64_t tablet_id, + int64_t top_n) const { + int32_t max = config::compaction_profile_max_records; + if (max <= 0) { + return {}; + } + + std::shared_lock lock(_mutex); + std::vector result; + int32_t count = 0; + for (auto it = _completed_tasks.rbegin(); it != _completed_tasks.rend(); ++it) { + if (count >= max) { + break; + } + count++; + if (tablet_id != 0 && it->tablet_id != tablet_id) { + continue; + } + result.push_back(*it); + if (top_n > 0 && static_cast(result.size()) >= top_n) { + break; + } + } + return result; +} + +} // namespace doris diff --git a/be/src/storage/compaction/compaction_task_tracker.h b/be/src/storage/compaction/compaction_task_tracker.h new file mode 100644 index 00000000000000..2d8627dcb5bd7a --- /dev/null +++ b/be/src/storage/compaction/compaction_task_tracker.h @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace doris { + +enum class CompactionTaskStatus : uint8_t { + PENDING = 0, + RUNNING = 1, + FINISHED = 2, + FAILED = 3, +}; + +const char* to_string(CompactionTaskStatus status); + +enum class CompactionProfileType : uint8_t { + BASE = 0, + CUMULATIVE = 1, + FULL = 2, + SINGLE_REPLICA = 3, + COLD_DATA = 4, + INDEX_CHANGE = 5, +}; + +const char* to_string(CompactionProfileType type); + +enum class TriggerMethod : uint8_t { + MANUAL = 0, + BACKGROUND = 1, +}; + +const char* to_string(TriggerMethod method); + +struct CompactionTaskInfo { + // identification + int64_t compaction_id {0}; + int64_t backend_id {0}; + int64_t table_id {0}; + int64_t partition_id {0}; + int64_t tablet_id {0}; + + // task attributes + CompactionProfileType compaction_type {CompactionProfileType::BASE}; + CompactionTaskStatus status {CompactionTaskStatus::PENDING}; + TriggerMethod trigger_method {TriggerMethod::BACKGROUND}; + int64_t compaction_score {0}; + + // timing + int64_t scheduled_time_ms {0}; + int64_t start_time_ms {0}; + int64_t end_time_ms {0}; + + // input stats (available after prepare_compact) + int64_t input_rowsets_count {0}; + int64_t input_row_num {0}; + int64_t input_data_size {0}; + int64_t input_segments_num {0}; + std::string input_version_range; + + // output stats (available after complete/fail) + int64_t merged_rows {0}; + int64_t filtered_rows {0}; + int64_t output_row_num {0}; + int64_t output_data_size {0}; + int64_t output_segments_num {0}; + std::string output_version; + + // IO stats + int64_t bytes_read_from_local {0}; + int64_t bytes_read_from_remote {0}; + + // resource + int64_t peak_memory_bytes {0}; + bool is_vertical {false}; + int64_t permits {0}; + + // error + std::string status_msg; +}; + +struct RunningStats { + int64_t start_time_ms {0}; + bool is_vertical {false}; + int64_t permits {0}; +}; + +struct CompletionStats { + // input version range (backfill for fallback path and to ensure it's always set) + std::string input_version_range; + int64_t end_time_ms {0}; + int64_t merged_rows {0}; + int64_t filtered_rows {0}; + int64_t output_row_num {0}; + int64_t output_data_size {0}; + int64_t output_segments_num {0}; + std::string output_version; + int64_t bytes_read_from_local {0}; + int64_t bytes_read_from_remote {0}; + int64_t peak_memory_bytes {0}; +}; + +class CompactionTaskTracker { +public: + static CompactionTaskTracker* instance(); + + int64_t next_compaction_id() { return _next_id.fetch_add(1, std::memory_order_relaxed); } + + void register_task(CompactionTaskInfo info); + void update_to_running(int64_t compaction_id, const RunningStats& stats); + void complete(int64_t compaction_id, const CompletionStats& stats); + void fail(int64_t compaction_id, const CompletionStats& stats, const std::string& msg); + void remove_task(int64_t compaction_id); + + // Returns active tasks + recent completed tasks snapshot. + std::vector get_all_tasks() const; + + // Returns only completed tasks (for HTTP API compatibility). + std::vector get_completed_tasks(int64_t tablet_id = 0, + int64_t top_n = 0) const; + + // Test only: clear all active and completed tasks. + void clear_for_test(); + +private: + CompactionTaskTracker() = default; + + // Apply completion stats to a task info, used by both complete() and fail() + // and fallback path. + void _apply_completion(CompactionTaskInfo& info, const CompletionStats& stats); + + void _trim_completed_locked(); + + std::atomic _next_id {1}; + + mutable std::shared_mutex _mutex; + std::unordered_map _active_tasks; + std::deque _completed_tasks; +}; + +} // namespace doris diff --git a/be/src/storage/compaction/cumulative_compaction.h b/be/src/storage/compaction/cumulative_compaction.h index 9e5bbbfcfb5241..28b71de8c957e3 100644 --- a/be/src/storage/compaction/cumulative_compaction.h +++ b/be/src/storage/compaction/cumulative_compaction.h @@ -39,6 +39,7 @@ class CumulativeCompaction final : public CompactionMixin { private: std::string_view compaction_name() const override { return "cumulative compaction"; } + CompactionProfileType profile_type() const override { return CompactionProfileType::CUMULATIVE; } ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } diff --git a/be/src/storage/compaction/full_compaction.h b/be/src/storage/compaction/full_compaction.h index fb80613f722db6..6cf62e4e23065e 100644 --- a/be/src/storage/compaction/full_compaction.h +++ b/be/src/storage/compaction/full_compaction.h @@ -44,6 +44,7 @@ class FullCompaction final : public CompactionMixin { Status modify_rowsets() override; std::string_view compaction_name() const override { return "full compaction"; } + CompactionProfileType profile_type() const override { return CompactionProfileType::FULL; } ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; } diff --git a/be/src/storage/compaction/single_replica_compaction.h b/be/src/storage/compaction/single_replica_compaction.h index c4f4ee0b15e55e..9f979530731709 100644 --- a/be/src/storage/compaction/single_replica_compaction.h +++ b/be/src/storage/compaction/single_replica_compaction.h @@ -38,6 +38,7 @@ class SingleReplicaCompaction final : public CompactionMixin { Status execute_compact() override; inline CompactionType real_compact_type() const { return _compaction_type; } + CompactionProfileType profile_type() const override { return CompactionProfileType::SINGLE_REPLICA; } protected: std::string_view compaction_name() const override { return "single replica compaction"; } diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index 498cc8e6c3115d..f681889c8f86e8 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -49,6 +49,7 @@ #include "agent/utils.h" #include "common/config.h" +#include "service/backend_options.h" #include "common/logging.h" #include "common/metrics/doris_metrics.h" #include "common/metrics/metrics.h" @@ -881,14 +882,41 @@ Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tab return Status::OK(); // No suitable version, regard as OK } + // Register task as PENDING in the tracker after successful prepare + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.backend_id = BackendOptions::get_backend_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.tablet_id = tablet->tablet_id(); + info.compaction_type = compaction->profile_type(); + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = TriggerMethod::BACKGROUND; + info.compaction_score = tablet->get_compaction_score(); + info.scheduled_time_ms = UnixMillis(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + info.input_version_range = compaction->input_version_range_str(); + CompactionTaskTracker::instance()->register_task(std::move(info)); + } + auto submit_st = _single_replica_compaction_thread_pool->submit_func( - [tablet, compaction = std::move(compaction), + [tablet, compaction = std::move(compaction), compaction_id, clean_single_replica_compaction]() mutable { + RunningStats rs; + rs.start_time_ms = UnixMillis(); + rs.is_vertical = compaction->is_vertical(); + CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); tablet->execute_single_replica_compaction(*compaction); clean_single_replica_compaction(); }); if (!submit_st.ok()) { clean_single_replica_compaction(); + CompactionTaskTracker::instance()->remove_task(compaction_id); return Status::InternalError( "failed to submit single replica compaction task to thread pool, " "tablet_id={}", @@ -1053,7 +1081,8 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet } Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, - CompactionType compaction_type, bool force) { + CompactionType compaction_type, bool force, + TriggerMethod trigger_method) { if (tablet->tablet_meta()->tablet_schema()->enable_single_replica_compaction() && should_fetch_from_peer(tablet->tablet_id())) { VLOG_CRITICAL << "start to submit single replica compaction task for tablet: " @@ -1078,6 +1107,28 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet, compaction, permits); if (st.ok() && permits > 0) { + // Register task as PENDING in the tracker after successful prepare + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.backend_id = BackendOptions::get_backend_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.tablet_id = tablet->tablet_id(); + info.compaction_type = compaction->profile_type(); + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = trigger_method; + info.compaction_score = tablet->get_compaction_score(); + info.scheduled_time_ms = UnixMillis(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + info.input_version_range = compaction->input_version_range_str(); + CompactionTaskTracker::instance()->register_task(std::move(info)); + } + if (!force) { _permit_limiter.request(permits); } @@ -1094,10 +1145,11 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, << ", max_threads: " << thread_pool->max_threads() << ", min_threads: " << thread_pool->min_threads() << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); - auto status = thread_pool->submit_func([=, compaction = std::move(compaction), this]() { - _handle_compaction(std::move(tablet), std::move(compaction), compaction_type, permits, - force); - }); + auto status = thread_pool->submit_func( + [=, compaction = std::move(compaction), this]() { + _handle_compaction(std::move(tablet), std::move(compaction), compaction_type, + permits, force); + }); if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] { DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( _cumu_compaction_thread_pool->get_queue_size()); @@ -1111,6 +1163,7 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, } _pop_tablet_from_submitted_compaction(tablet, compaction_type); tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; + CompactionTaskTracker::instance()->remove_task(compaction_id); return Status::InternalError( "failed to submit compaction task to thread pool, " "tablet_id={}, compaction_type={}.", @@ -1153,6 +1206,11 @@ void StorageEngine::_handle_compaction(TabletSharedPtr tablet, } _pop_tablet_from_submitted_compaction(tablet, compaction_type); tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; + // Clean up any PENDING/RUNNING tracker entry that wasn't moved to + // completed by submit_profile_record(). This covers early-return + // branches (large task delay, tablet state change) and is a no-op + // if the task was already completed. + CompactionTaskTracker::instance()->remove_task(compaction->compaction_id()); if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { std::lock_guard lock(_cumu_compaction_delay_mtx); _cumu_compaction_thread_pool_used_threads--; @@ -1219,12 +1277,21 @@ void StorageEngine::_handle_compaction(TabletSharedPtr tablet, return; } tablet->compaction_stage = CompactionStage::EXECUTING; + // Update tracker to RUNNING when execution starts + { + RunningStats stats; + stats.start_time_ms = UnixMillis(); + stats.is_vertical = compaction->is_vertical(); + stats.permits = permits; + CompactionTaskTracker::instance()->update_to_running(compaction->compaction_id(), stats); + } TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction"); tablet->execute_compaction(*compaction); } Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force, bool eager) { + bool force, bool eager, + TriggerMethod trigger_method) { if (!eager) { DCHECK(compaction_type == CompactionType::BASE_COMPACTION || compaction_type == CompactionType::CUMULATIVE_COMPACTION); @@ -1254,7 +1321,7 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT _cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy())); } tablet->set_skip_compaction(false); - return _submit_compaction_task(tablet, compaction_type, force); + return _submit_compaction_task(tablet, compaction_type, force, trigger_method); } Status StorageEngine::_handle_seg_compaction(std::shared_ptr worker, @@ -1632,6 +1699,32 @@ void StorageEngine::_handle_cold_data_compaction(TabletSharedPtr t) { return; } + // Register task as RUNNING directly (cold data compaction has no PENDING phase) + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.backend_id = BackendOptions::get_backend_id(); + info.table_id = t->table_id(); + info.partition_id = t->partition_id(); + info.tablet_id = t->tablet_id(); + info.compaction_type = compaction->profile_type(); + info.status = CompactionTaskStatus::RUNNING; + info.trigger_method = TriggerMethod::BACKGROUND; + info.compaction_score = t->calc_cold_data_compaction_score(); + info.scheduled_time_ms = UnixMillis(); + info.start_time_ms = UnixMillis(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + info.input_version_range = compaction->input_version_range_str(); + info.is_vertical = compaction->is_vertical(); + CompactionTaskTracker::instance()->register_task(std::move(info)); + } + + // submit_profile_record() inside execute_compact() handles both + // success (complete) and failure (fail) tracker updates. st = compaction->execute_compact(); if (!st.ok()) { LOG(WARNING) << "failed to execute cold data compaction. tablet_id=" << t->tablet_id() diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h index 8b50d1c4d9bf65..8f46e21ef9785a 100644 --- a/be/src/storage/storage_engine.h +++ b/be/src/storage/storage_engine.h @@ -42,6 +42,7 @@ #include "common/status.h" #include "runtime/heartbeat_flags.h" #include "storage/compaction/compaction_permit_limiter.h" +#include "storage/compaction/compaction_task_tracker.h" #include "storage/delete/calc_delete_bitmap_executor.h" #include "storage/olap_common.h" #include "storage/options.h" @@ -349,7 +350,8 @@ class StorageEngine final : public BaseStorageEngine { void get_compaction_status_json(std::string* result); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force, bool eager = true); + bool force, bool eager = true, + TriggerMethod trigger_method = TriggerMethod::BACKGROUND); Status submit_seg_compaction_task(std::shared_ptr worker, SegCompactionCandidatesSharedPtr segments); @@ -445,7 +447,8 @@ class StorageEngine final : public BaseStorageEngine { CompactionType compaction_type); Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force); + bool force, + TriggerMethod trigger_method = TriggerMethod::BACKGROUND); void _handle_compaction(TabletSharedPtr tablet, std::shared_ptr compaction, CompactionType compaction_type, int64_t permits, bool force); diff --git a/be/src/storage/task/engine_cloud_index_change_task.cpp b/be/src/storage/task/engine_cloud_index_change_task.cpp index bcdb0c8a843735..61b044cf0e1630 100644 --- a/be/src/storage/task/engine_cloud_index_change_task.cpp +++ b/be/src/storage/task/engine_cloud_index_change_task.cpp @@ -20,7 +20,10 @@ #include "cloud/cloud_index_change_compaction.h" #include "cloud/cloud_tablet_mgr.h" #include "cpp/sync_point.h" +#include "service/backend_options.h" +#include "storage/compaction/compaction_task_tracker.h" #include "storage/tablet/tablet_manager.h" +#include "util/time.h" namespace doris { @@ -138,7 +141,32 @@ Status EngineCloudIndexChangeTask::execute() { } } + // Register task as RUNNING directly (index change has no PENDING phase) + int64_t compaction_id = index_change_compact->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.backend_id = BackendOptions::get_backend_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.tablet_id = tablet->tablet_id(); + info.compaction_type = index_change_compact->profile_type(); + info.status = CompactionTaskStatus::RUNNING; + info.trigger_method = TriggerMethod::BACKGROUND; + info.scheduled_time_ms = UnixMillis(); + info.start_time_ms = UnixMillis(); + info.input_rowsets_count = index_change_compact->input_rowsets_count(); + info.input_row_num = index_change_compact->input_row_num(); + info.input_data_size = index_change_compact->input_rowsets_data_size(); + info.input_segments_num = index_change_compact->input_segments_num_value(); + info.input_version_range = index_change_compact->input_version_range_str(); + info.is_vertical = index_change_compact->is_vertical(); + CompactionTaskTracker::instance()->register_task(std::move(info)); + } + VLOG_DEBUG << "[index_change] begin execute index change compact." << tablet_id_str; + // submit_profile_record() inside execute_compact() handles both + // success (complete) and failure (fail) tracker updates. Status exec_ret = index_change_compact->execute_compact(); if (!exec_ret.ok()) { LOG(WARNING) << "[index_change] exec index change compaction failed." << tablet_id_str; diff --git a/be/test/cloud/cloud_compaction_test.cpp b/be/test/cloud/cloud_compaction_test.cpp index 05eec3149f996a..0981a5a7a12c6f 100644 --- a/be/test/cloud/cloud_compaction_test.cpp +++ b/be/test/cloud/cloud_compaction_test.cpp @@ -239,6 +239,8 @@ class TestableCloudCompaction : public CloudCompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } std::string_view compaction_name() const override { return "test_compaction"; } + + CompactionProfileType profile_type() const override { return CompactionProfileType::CUMULATIVE; } }; TEST_F(CloudCompactionTest, test_set_storage_resource_from_input_rowsets) { diff --git a/be/test/storage/compaction/compaction_task_tracker_test.cpp b/be/test/storage/compaction/compaction_task_tracker_test.cpp new file mode 100644 index 00000000000000..4f2e1ba008f898 --- /dev/null +++ b/be/test/storage/compaction/compaction_task_tracker_test.cpp @@ -0,0 +1,413 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/compaction/compaction_task_tracker.h" + +#include + +#include +#include + +#include "common/config.h" + +namespace doris { + +class CompactionTaskTrackerTest : public testing::Test { +protected: + void SetUp() override { + _saved_max_records = config::compaction_profile_max_records; + config::compaction_profile_max_records = 100; + CompactionTaskTracker::instance()->clear_for_test(); + } + + void TearDown() override { + CompactionTaskTracker::instance()->clear_for_test(); + config::compaction_profile_max_records = _saved_max_records; + } + + CompactionTaskInfo make_info(int64_t id, CompactionTaskStatus status, + TriggerMethod trigger = TriggerMethod::BACKGROUND) { + CompactionTaskInfo info; + info.compaction_id = id; + info.backend_id = 1; + info.table_id = 100; + info.partition_id = 200; + info.tablet_id = 300 + id; + info.compaction_type = CompactionProfileType::BASE; + info.status = status; + info.trigger_method = trigger; + info.compaction_score = 10; + info.scheduled_time_ms = 1000000; + info.input_rowsets_count = 5; + info.input_row_num = 1000; + info.input_data_size = 10000; + info.input_segments_num = 3; + info.input_version_range = "[0-5]"; + return info; + } + + CompletionStats make_completion_stats(const std::string& status_msg_for_version = "[0-5]") { + CompletionStats stats; + stats.input_version_range = "[0-5]"; + stats.end_time_ms = 2000000; + stats.merged_rows = 100; + stats.filtered_rows = 10; + stats.output_row_num = 890; + stats.output_data_size = 8000; + stats.output_segments_num = 1; + stats.output_version = status_msg_for_version; + stats.bytes_read_from_local = 5000; + stats.bytes_read_from_remote = 3000; + stats.peak_memory_bytes = 1024 * 1024; + return stats; + } + + // Helper to find a task by ID in a vector + const CompactionTaskInfo* find_task(const std::vector& tasks, + int64_t id) { + for (const auto& t : tasks) { + if (t.compaction_id == id) return &t; + } + return nullptr; + } + + int32_t _saved_max_records = 0; +}; + +TEST_F(CompactionTaskTrackerTest, FullLifecycle_PendingToRunningToFinished) { + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + // Register as PENDING + auto info = make_info(id, CompactionTaskStatus::PENDING); + tracker->register_task(info); + + auto tasks = tracker->get_all_tasks(); + auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::PENDING); + EXPECT_EQ(task->tablet_id, 300 + id); + EXPECT_EQ(task->input_version_range, "[0-5]"); + + // Update to RUNNING + RunningStats rs; + rs.start_time_ms = 1500000; + rs.is_vertical = true; + rs.permits = 42; + tracker->update_to_running(id, rs); + + tasks = tracker->get_all_tasks(); + task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::RUNNING); + EXPECT_EQ(task->start_time_ms, 1500000); + EXPECT_TRUE(task->is_vertical); + EXPECT_EQ(task->permits, 42); + // Fields from PENDING should be preserved + EXPECT_EQ(task->input_version_range, "[0-5]"); + + // Complete + auto cs = make_completion_stats(); + tracker->complete(id, cs); + + tasks = tracker->get_all_tasks(); + task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); + EXPECT_EQ(task->merged_rows, 100); + EXPECT_EQ(task->output_row_num, 890); + EXPECT_EQ(task->peak_memory_bytes, 1024 * 1024); + EXPECT_EQ(task->tablet_id, 300 + id); + EXPECT_EQ(task->start_time_ms, 1500000); +} + +TEST_F(CompactionTaskTrackerTest, DirectRunning_ManualTrigger) { + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + auto info = make_info(id, CompactionTaskStatus::RUNNING, TriggerMethod::MANUAL); + info.start_time_ms = 1500000; + tracker->register_task(info); + + auto tasks = tracker->get_all_tasks(); + auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::RUNNING); + EXPECT_EQ(task->trigger_method, TriggerMethod::MANUAL); + + tracker->complete(id, make_completion_stats()); + + tasks = tracker->get_all_tasks(); + task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); + EXPECT_EQ(task->trigger_method, TriggerMethod::MANUAL); +} + +TEST_F(CompactionTaskTrackerTest, FailPath_StatusMsgPreserved) { + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + auto info = make_info(id, CompactionTaskStatus::RUNNING); + tracker->register_task(info); + + auto cs = make_completion_stats(); + tracker->fail(id, cs, "compaction failed: tablet corruption detected"); + + auto tasks = tracker->get_all_tasks(); + auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FAILED); + EXPECT_EQ(task->status_msg, "compaction failed: tablet corruption detected"); + EXPECT_EQ(task->merged_rows, 100); // completion stats still filled +} + +TEST_F(CompactionTaskTrackerTest, RemoveTask_CleansUpPending) { + // Simulates: task registered as PENDING, then prepare fails or early return, + // so remove_task() is called to clean up. + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + auto info = make_info(id, CompactionTaskStatus::PENDING); + tracker->register_task(info); + + // Simulate early return (e.g., large task delay, tablet state change) + tracker->remove_task(id); + + auto tasks = tracker->get_all_tasks(); + EXPECT_EQ(find_task(tasks, id), nullptr); +} + +TEST_F(CompactionTaskTrackerTest, RemoveTask_NoOpAfterComplete) { + // Verifies that remove_task() is safe to call after complete() — it's a no-op. + // This matches the Defer pattern in _handle_compaction(). + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + auto info = make_info(id, CompactionTaskStatus::RUNNING); + tracker->register_task(info); + tracker->complete(id, make_completion_stats()); + + // This should be a no-op — the task is in _completed_tasks, not _active_tasks + tracker->remove_task(id); + + auto tasks = tracker->get_all_tasks(); + auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); // Still in completed + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); +} + +TEST_F(CompactionTaskTrackerTest, FallbackRecord_NoRegister) { + // Simulates: entry point forgot to register_task(), but submit_profile_record() + // calls complete/fail. Tracker creates a degraded record. + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + auto cs = make_completion_stats(); + tracker->complete(id, cs); + + auto tasks = tracker->get_all_tasks(); + auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); + EXPECT_EQ(task->trigger_method, TriggerMethod::BACKGROUND); // default + EXPECT_EQ(task->scheduled_time_ms, cs.end_time_ms); // fallback + EXPECT_EQ(task->compaction_score, 0); // not available + EXPECT_EQ(task->merged_rows, 100); // completion stats present +} + +TEST_F(CompactionTaskTrackerTest, FallbackRecord_FailWithStatusMsg) { + // Simulates: entry point forgot to register, execute_compact fails with a real error. + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + auto cs = make_completion_stats(); + tracker->fail(id, cs, "[INTERNAL_ERROR]compaction failed: disk full"); + + auto tasks = tracker->get_all_tasks(); + auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FAILED); + EXPECT_EQ(task->status_msg, "[INTERNAL_ERROR]compaction failed: disk full"); +} + +TEST_F(CompactionTaskTrackerTest, InputVersionRange_BackfillOnComplete) { + // Verifies: if input_version_range was not set at registration, + // it gets backfilled from CompletionStats. + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + CompactionTaskInfo info; + info.compaction_id = id; + info.status = CompactionTaskStatus::RUNNING; + // input_version_range is empty + tracker->register_task(info); + + auto cs = make_completion_stats(); + cs.input_version_range = "[2-10]"; + tracker->complete(id, cs); + + auto tasks = tracker->get_all_tasks(); + auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->input_version_range, "[2-10]"); +} + +TEST_F(CompactionTaskTrackerTest, InputVersionRange_NotOverwrittenIfAlreadySet) { + // If input_version_range was set at registration, it should NOT be overwritten. + auto* tracker = CompactionTaskTracker::instance(); + int64_t id = tracker->next_compaction_id(); + + CompactionTaskInfo info; + info.compaction_id = id; + info.status = CompactionTaskStatus::RUNNING; + info.input_version_range = "[0-5]"; + tracker->register_task(info); + + auto cs = make_completion_stats(); + cs.input_version_range = "[2-10]"; // different + tracker->complete(id, cs); + + auto tasks = tracker->get_all_tasks(); + auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->input_version_range, "[0-5]"); // original preserved +} + +TEST_F(CompactionTaskTrackerTest, TrimCompleted) { + auto* tracker = CompactionTaskTracker::instance(); + config::compaction_profile_max_records = 10; + + for (int i = 0; i < 20; i++) { + int64_t id = tracker->next_compaction_id(); + auto info = make_info(id, CompactionTaskStatus::RUNNING); + tracker->register_task(info); + tracker->complete(id, make_completion_stats()); + } + + auto tasks = tracker->get_all_tasks(); + int completed = 0; + for (const auto& t : tasks) { + if (t.status == CompactionTaskStatus::FINISHED) { + completed++; + } + } + EXPECT_LE(completed, 10); +} + +TEST_F(CompactionTaskTrackerTest, DisableConfig_ClearsCompletedButKeepsActive) { + auto* tracker = CompactionTaskTracker::instance(); + + // Create a completed task + int64_t id1 = tracker->next_compaction_id(); + auto info1 = make_info(id1, CompactionTaskStatus::RUNNING); + tracker->register_task(info1); + tracker->complete(id1, make_completion_stats()); + + // Create an active task + int64_t id2 = tracker->next_compaction_id(); + auto info2 = make_info(id2, CompactionTaskStatus::PENDING); + tracker->register_task(info2); + + // Disable + config::compaction_profile_max_records = 0; + auto completed = tracker->get_completed_tasks(); + EXPECT_TRUE(completed.empty()); + + // Active task should still be visible + auto all = tracker->get_all_tasks(); + EXPECT_NE(find_task(all, id2), nullptr); + + tracker->remove_task(id2); +} + +TEST_F(CompactionTaskTrackerTest, ConcurrentSafety) { + auto* tracker = CompactionTaskTracker::instance(); + constexpr int kThreads = 8; + constexpr int kOpsPerThread = 50; + + std::vector threads; + for (int t = 0; t < kThreads; t++) { + threads.emplace_back([tracker]() { + for (int i = 0; i < kOpsPerThread; i++) { + int64_t id = tracker->next_compaction_id(); + CompactionTaskInfo info; + info.compaction_id = id; + info.status = CompactionTaskStatus::RUNNING; + tracker->register_task(info); + + CompletionStats cs; + cs.end_time_ms = 1000; + tracker->complete(id, cs); + + tracker->get_all_tasks(); + } + }); + } + for (auto& th : threads) { + th.join(); + } + + auto tasks = tracker->get_all_tasks(); + int completed = 0; + for (const auto& t : tasks) { + if (t.status == CompactionTaskStatus::FINISHED) completed++; + } + EXPECT_GT(completed, 0); +} + +TEST_F(CompactionTaskTrackerTest, GetCompletedTasks_FilterByTabletAndTopN) { + auto* tracker = CompactionTaskTracker::instance(); + + for (int i = 0; i < 5; i++) { + int64_t id = tracker->next_compaction_id(); + CompactionTaskInfo info; + info.compaction_id = id; + info.tablet_id = (i < 3) ? 1001 : 1002; + info.status = CompactionTaskStatus::RUNNING; + tracker->register_task(info); + tracker->complete(id, make_completion_stats()); + } + + auto filtered = tracker->get_completed_tasks(1001); + EXPECT_EQ(filtered.size(), 3); + for (const auto& t : filtered) { + EXPECT_EQ(t.tablet_id, 1001); + } + + auto top2 = tracker->get_completed_tasks(0, 2); + EXPECT_EQ(top2.size(), 2); +} + +TEST_F(CompactionTaskTrackerTest, ToStringFunctions) { + EXPECT_STREQ(to_string(CompactionTaskStatus::PENDING), "PENDING"); + EXPECT_STREQ(to_string(CompactionTaskStatus::RUNNING), "RUNNING"); + EXPECT_STREQ(to_string(CompactionTaskStatus::FINISHED), "FINISHED"); + EXPECT_STREQ(to_string(CompactionTaskStatus::FAILED), "FAILED"); + + EXPECT_STREQ(to_string(CompactionProfileType::BASE), "base"); + EXPECT_STREQ(to_string(CompactionProfileType::CUMULATIVE), "cumulative"); + EXPECT_STREQ(to_string(CompactionProfileType::FULL), "full"); + EXPECT_STREQ(to_string(CompactionProfileType::SINGLE_REPLICA), "single_replica"); + EXPECT_STREQ(to_string(CompactionProfileType::COLD_DATA), "cold_data"); + EXPECT_STREQ(to_string(CompactionProfileType::INDEX_CHANGE), "index_change"); + + EXPECT_STREQ(to_string(TriggerMethod::MANUAL), "MANUAL"); + EXPECT_STREQ(to_string(TriggerMethod::BACKGROUND), "BACKGROUND"); +} + +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 02ea8a610a4eec..4c6a7a9755decf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -113,6 +113,8 @@ public enum SchemaTableType { TSchemaTableType.SCH_BLACKHOLE), SCH_COLUMN_DATA_SIZES("COLUMN_DATA_SIZES", "COLUMN_DATA_SIZES", TSchemaTableType.SCH_COLUMN_DATA_SIZES), + SCH_BE_COMPACTION_TASKS("BE_COMPACTION_TASKS", "BE_COMPACTION_TASKS", + TSchemaTableType.SCH_BE_COMPACTION_TASKS), SCH_DATABASE_PROPERTIES("DATABASE_PROPERTIES", "DATABASE_PROPERTIES", TSchemaTableType.SCH_DATABASE_PROPERTIES); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index d2f11508db231c..758ad9ea8120cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -412,6 +412,38 @@ public class SchemaTable extends Table { .column("CHARACTER_SET_CLIENT", ScalarType.createVarchar(32)) .column("COLLATION_CONNECTION", ScalarType.createVarchar(32)) .column("DATABASE_COLLATION", ScalarType.createVarchar(32)).build())) + .put("be_compaction_tasks", new SchemaTable(SystemIdGenerator.getNextId(), "be_compaction_tasks", TableType.SCHEMA, + builder().column("BACKEND_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("COMPACTION_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("TABLE_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("PARTITION_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("TABLET_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("COMPACTION_TYPE", ScalarType.createVarchar(64)) + .column("STATUS", ScalarType.createVarchar(16)) + .column("TRIGGER_METHOD", ScalarType.createVarchar(16)) + .column("COMPACTION_SCORE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SCHEDULED_TIME", ScalarType.createType(PrimitiveType.DATETIME)) + .column("START_TIME", ScalarType.createType(PrimitiveType.DATETIME)) + .column("END_TIME", ScalarType.createType(PrimitiveType.DATETIME)) + .column("ELAPSED_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_ROWSETS_COUNT", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_ROW_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_DATA_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_SEGMENTS_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_VERSION_RANGE", ScalarType.createVarchar(64)) + .column("MERGED_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("FILTERED_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_ROW_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_DATA_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_SEGMENTS_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_VERSION", ScalarType.createVarchar(64)) + .column("BYTES_READ_FROM_LOCAL", ScalarType.createType(PrimitiveType.BIGINT)) + .column("BYTES_READ_FROM_REMOTE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("PEAK_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("IS_VERTICAL", ScalarType.createType(PrimitiveType.BOOLEAN)) + .column("PERMITS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("STATUS_MSG", ScalarType.createVarchar(1024)) + .build())) .put("rowsets", new SchemaTable(SystemIdGenerator.getNextId(), "rowsets", TableType.SCHEMA, builder().column("BACKEND_ID", ScalarType.createType(PrimitiveType.BIGINT)) .column("ROWSET_ID", ScalarType.createVarchar(64)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index 4338ecbabd92bc..345b6be6d003f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -77,6 +77,8 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { BACKEND_TABLE.add("backend_configuration"); BACKEND_TABLE.add("column_data_sizes"); + + BACKEND_TABLE.add("be_compaction_tasks"); } public static boolean isBackendPartitionedSchemaTable(String tableName) { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 730a74ae6f34a8..fe4e72e8094170 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -214,6 +214,7 @@ enum TSchemaTableType { SCH_LOAD_JOBS = 64; SCH_FILE_CACHE_INFO = 65; SCH_DATABASE_PROPERTIES = 66; + SCH_BE_COMPACTION_TASKS = 67; } enum THdfsCompression { diff --git a/regression-test/suites/compaction/test_be_compaction_tasks.groovy b/regression-test/suites/compaction/test_be_compaction_tasks.groovy new file mode 100644 index 00000000000000..74c3c4e65180d9 --- /dev/null +++ b/regression-test/suites/compaction/test_be_compaction_tasks.groovy @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_be_compaction_tasks") { + def tableName = "test_be_compaction_tasks" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id INT NOT NULL, + name STRING NOT NULL + ) DUPLICATE KEY (`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + // Insert multiple batches to create rowsets for compaction + for (i in 0..<10) { + sql """ INSERT INTO ${tableName} VALUES(${i}, "row_${i}") """ + } + + // 1. Test: basic query returns valid result set + def result1 = sql """ SELECT * FROM information_schema.be_compaction_tasks LIMIT 10 """ + log.info("Initial query result size: ${result1.size()}") + + // 2. Trigger a cumulative compaction via HTTP API + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + def tablets = sql_return_maparray """ show tablets from ${tableName} """ + def tablet = tablets[0] + def tablet_id = tablet.TabletId + def be_host = backendId_to_backendIP["${tablet.BackendId}"] + def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"] + + def (code2, text2, err2) = be_run_cumulative_compaction(be_host, be_port, tablet_id) + log.info("Trigger compaction response: ${text2}") + + // Wait for compaction to finish + def running = true + def maxWait = 30 + while (running && maxWait > 0) { + Thread.sleep(1000) + def (code, out, err) = be_get_compaction_status(be_host, be_port, tablet_id) + def status = parseJson(out.trim()) + running = status.run_status + maxWait-- + } + assertFalse(running, "Compaction did not finish in time") + + // 3. Test: FINISHED record should appear with correct fields + def result3 = sql """ + SELECT COMPACTION_ID, COMPACTION_TYPE, STATUS, TRIGGER_METHOD, + TABLET_ID, INPUT_ROWSETS_COUNT, INPUT_ROW_NUM + FROM information_schema.be_compaction_tasks + WHERE TABLET_ID = ${tablet_id} AND STATUS = 'FINISHED' + ORDER BY COMPACTION_ID DESC + LIMIT 1 + """ + log.info("Compaction tasks for tablet: ${result3}") + assertTrue(result3.size() > 0, "Expected at least one FINISHED record after compaction") + + def row = result3[0] + assertNotNull(row[0], "COMPACTION_ID should not be null") + assertEquals("cumulative", row[1].toString()) + assertEquals("FINISHED", row[2].toString()) + assertEquals("MANUAL", row[3].toString()) + assertTrue(Long.parseLong(row[5].toString()) > 0, "INPUT_ROWSETS_COUNT should be > 0") + assertTrue(Long.parseLong(row[6].toString()) > 0, "INPUT_ROW_NUM should be > 0") + + // 4. Test: filter by STATUS + def result4 = sql """ + SELECT COUNT(*) FROM information_schema.be_compaction_tasks + WHERE STATUS = 'FINISHED' + """ + assertTrue(Long.parseLong(result4[0][0].toString()) >= 1) + + // 5. Test: BACKEND_ID is populated + def result5 = sql """ + SELECT BACKEND_ID FROM information_schema.be_compaction_tasks + WHERE TABLET_ID = ${tablet_id} + LIMIT 1 + """ + assertTrue(result5.size() > 0) + assertTrue(Long.parseLong(result5[0][0].toString()) > 0, "BACKEND_ID should be positive") + + // 6. Test: non-existent tablet returns empty + def result6 = sql """ + SELECT * FROM information_schema.be_compaction_tasks + WHERE TABLET_ID = 999999999 + """ + assertEquals(0, result6.size()) + + sql """ DROP TABLE IF EXISTS ${tableName} """ +} From e66a5ffe936841370533b70cdaa5d5369468e630 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 17 Mar 2026 15:36:30 +0800 Subject: [PATCH 2/3] [Enhancement](compaction) add full column verification to be_compaction_tasks regression test Add SELECT * and explicit 30-column named SELECT to verify all columns are queryable with reasonable values. Log each column for visual inspection. Also add DESC test to verify schema has exactly 30 columns. --- .../org/apache/doris/catalog/SchemaTable.java | 3 +- .../test_be_compaction_tasks.groovy | 110 +++++++++++++++++- 2 files changed, 110 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 758ad9ea8120cc..1a2bfe5abd9d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -412,7 +412,8 @@ public class SchemaTable extends Table { .column("CHARACTER_SET_CLIENT", ScalarType.createVarchar(32)) .column("COLLATION_CONNECTION", ScalarType.createVarchar(32)) .column("DATABASE_COLLATION", ScalarType.createVarchar(32)).build())) - .put("be_compaction_tasks", new SchemaTable(SystemIdGenerator.getNextId(), "be_compaction_tasks", TableType.SCHEMA, + .put("be_compaction_tasks", new SchemaTable(SystemIdGenerator.getNextId(), + "be_compaction_tasks", TableType.SCHEMA, builder().column("BACKEND_ID", ScalarType.createType(PrimitiveType.BIGINT)) .column("COMPACTION_ID", ScalarType.createType(PrimitiveType.BIGINT)) .column("TABLE_ID", ScalarType.createType(PrimitiveType.BIGINT)) diff --git a/regression-test/suites/compaction/test_be_compaction_tasks.groovy b/regression-test/suites/compaction/test_be_compaction_tasks.groovy index 74c3c4e65180d9..9d9f12275d9961 100644 --- a/regression-test/suites/compaction/test_be_compaction_tasks.groovy +++ b/regression-test/suites/compaction/test_be_compaction_tasks.groovy @@ -48,8 +48,21 @@ suite("test_be_compaction_tasks") { def be_host = backendId_to_backendIP["${tablet.BackendId}"] def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"] - def (code2, text2, err2) = be_run_cumulative_compaction(be_host, be_port, tablet_id) - log.info("Trigger compaction response: ${text2}") + // Trigger cumulative compaction with retry + def triggerSuccess = false + for (int attempt = 0; attempt < 3; attempt++) { + def (code2, text2, err2) = be_run_cumulative_compaction(be_host, be_port, tablet_id) + log.info("Trigger compaction attempt ${attempt}: code=${code2}, out=${text2}, err=${err2}") + if (text2 != null && !text2.trim().isEmpty()) { + def triggerResult = parseJson(text2.trim()) + if (triggerResult.status.toLowerCase() == "success") { + triggerSuccess = true + break + } + } + Thread.sleep(2000) + } + assertTrue(triggerSuccess, "Failed to trigger cumulative compaction") // Wait for compaction to finish def running = true @@ -57,6 +70,11 @@ suite("test_be_compaction_tasks") { while (running && maxWait > 0) { Thread.sleep(1000) def (code, out, err) = be_get_compaction_status(be_host, be_port, tablet_id) + log.info("Compaction status: code=${code}, out=${out}, err=${err}") + if (out == null || out.trim().isEmpty()) { + maxWait-- + continue + } def status = parseJson(out.trim()) running = status.run_status maxWait-- @@ -106,5 +124,93 @@ suite("test_be_compaction_tasks") { """ assertEquals(0, result6.size()) + // 7. Test: SELECT * to verify all 30 columns are queryable and log full row + def resultAll = sql """ + SELECT * FROM information_schema.be_compaction_tasks + WHERE TABLET_ID = ${tablet_id} + ORDER BY COMPACTION_ID DESC + LIMIT 1 + """ + assertTrue(resultAll.size() > 0, "Expected at least one record for SELECT *") + assertEquals(30, resultAll[0].size(), "Expected 30 columns in be_compaction_tasks") + log.info("Full row (SELECT *): ${resultAll[0]}") + + // 8. Test: query all 30 columns by name to verify each column is accessible + def resultNamed = sql """ + SELECT BACKEND_ID, COMPACTION_ID, TABLE_ID, PARTITION_ID, TABLET_ID, + COMPACTION_TYPE, STATUS, TRIGGER_METHOD, COMPACTION_SCORE, + SCHEDULED_TIME, START_TIME, END_TIME, ELAPSED_TIME_MS, + INPUT_ROWSETS_COUNT, INPUT_ROW_NUM, INPUT_DATA_SIZE, INPUT_SEGMENTS_NUM, INPUT_VERSION_RANGE, + MERGED_ROWS, FILTERED_ROWS, + OUTPUT_ROW_NUM, OUTPUT_DATA_SIZE, OUTPUT_SEGMENTS_NUM, OUTPUT_VERSION, + BYTES_READ_FROM_LOCAL, BYTES_READ_FROM_REMOTE, PEAK_MEMORY_BYTES, + IS_VERTICAL, PERMITS, STATUS_MSG + FROM information_schema.be_compaction_tasks + WHERE TABLET_ID = ${tablet_id} AND STATUS = 'FINISHED' + ORDER BY COMPACTION_ID DESC + LIMIT 1 + """ + assertTrue(resultNamed.size() > 0) + def r = resultNamed[0] + // Log each column value for visual inspection + log.info("=== be_compaction_tasks full column dump ===") + log.info("BACKEND_ID: ${r[0]}") + log.info("COMPACTION_ID: ${r[1]}") + log.info("TABLE_ID: ${r[2]}") + log.info("PARTITION_ID: ${r[3]}") + log.info("TABLET_ID: ${r[4]}") + log.info("COMPACTION_TYPE: ${r[5]}") + log.info("STATUS: ${r[6]}") + log.info("TRIGGER_METHOD: ${r[7]}") + log.info("COMPACTION_SCORE: ${r[8]}") + log.info("SCHEDULED_TIME: ${r[9]}") + log.info("START_TIME: ${r[10]}") + log.info("END_TIME: ${r[11]}") + log.info("ELAPSED_TIME_MS: ${r[12]}") + log.info("INPUT_ROWSETS_COUNT: ${r[13]}") + log.info("INPUT_ROW_NUM: ${r[14]}") + log.info("INPUT_DATA_SIZE: ${r[15]}") + log.info("INPUT_SEGMENTS_NUM: ${r[16]}") + log.info("INPUT_VERSION_RANGE: ${r[17]}") + log.info("MERGED_ROWS: ${r[18]}") + log.info("FILTERED_ROWS: ${r[19]}") + log.info("OUTPUT_ROW_NUM: ${r[20]}") + log.info("OUTPUT_DATA_SIZE: ${r[21]}") + log.info("OUTPUT_SEGMENTS_NUM: ${r[22]}") + log.info("OUTPUT_VERSION: ${r[23]}") + log.info("BYTES_READ_FROM_LOCAL: ${r[24]}") + log.info("BYTES_READ_FROM_REMOTE:${r[25]}") + log.info("PEAK_MEMORY_BYTES: ${r[26]}") + log.info("IS_VERTICAL: ${r[27]}") + log.info("PERMITS: ${r[28]}") + log.info("STATUS_MSG: ${r[29]}") + + // Verify key columns have reasonable values for a completed compaction + assertTrue(Long.parseLong(r[0].toString()) > 0, "BACKEND_ID > 0") + assertTrue(Long.parseLong(r[1].toString()) > 0, "COMPACTION_ID > 0") + assertTrue(Long.parseLong(r[2].toString()) > 0, "TABLE_ID > 0") + assertTrue(Long.parseLong(r[3].toString()) > 0, "PARTITION_ID > 0") + assertEquals(tablet_id, r[4].toString(), "TABLET_ID matches") + assertEquals("cumulative", r[5].toString()) + assertEquals("FINISHED", r[6].toString()) + assertEquals("MANUAL", r[7].toString()) + assertNotNull(r[9], "SCHEDULED_TIME not null") + assertNotNull(r[10], "START_TIME not null") + assertNotNull(r[11], "END_TIME not null") + assertTrue(Long.parseLong(r[12].toString()) >= 0, "ELAPSED_TIME_MS >= 0") + assertTrue(Long.parseLong(r[13].toString()) > 0, "INPUT_ROWSETS_COUNT > 0") + assertTrue(Long.parseLong(r[14].toString()) > 0, "INPUT_ROW_NUM > 0") + assertTrue(Long.parseLong(r[15].toString()) > 0, "INPUT_DATA_SIZE > 0") + assertTrue(Long.parseLong(r[16].toString()) >= 0, "INPUT_SEGMENTS_NUM >= 0") + assertTrue(r[17].toString().length() > 0, "INPUT_VERSION_RANGE not empty") + assertTrue(Long.parseLong(r[20].toString()) >= 0, "OUTPUT_ROW_NUM >= 0") + assertTrue(Long.parseLong(r[22].toString()) >= 0, "OUTPUT_SEGMENTS_NUM >= 0") + assertTrue(r[23].toString().length() > 0, "OUTPUT_VERSION not empty") + + // 9. Test: DESC to verify table schema + def descResult = sql """ DESC information_schema.be_compaction_tasks """ + log.info("DESC result: ${descResult}") + assertEquals(30, descResult.size(), "Expected 30 columns in DESC") + sql """ DROP TABLE IF EXISTS ${tableName} """ } From e8d6bb12c2b24039b77925bd4591f17cd4a15684 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 17 Mar 2026 17:27:04 +0800 Subject: [PATCH 3/3] 3 --- be/src/cloud/cloud_compaction_action.cpp | 8 ++--- be/src/cloud/cloud_cumulative_compaction.h | 4 ++- be/src/cloud/cloud_index_change_compaction.h | 4 ++- be/src/cloud/cloud_storage_engine.cpp | 8 ++--- be/src/cloud/cloud_storage_engine.h | 8 ++--- .../schema_compaction_tasks_scanner.cpp | 16 ++++----- be/src/information_schema/schema_scanner.cpp | 2 +- .../service/http/action/compaction_action.cpp | 12 +++---- be/src/storage/compaction/compaction.cpp | 36 +++++++++---------- be/src/storage/compaction/compaction.h | 2 +- .../compaction/cumulative_compaction.h | 4 ++- .../compaction/single_replica_compaction.h | 4 ++- be/src/storage/olap_server.cpp | 14 ++++---- be/test/cloud/cloud_compaction_test.cpp | 4 ++- .../compaction_task_tracker_test.cpp | 7 ++-- 15 files changed, 69 insertions(+), 64 deletions(-) diff --git a/be/src/cloud/cloud_compaction_action.cpp b/be/src/cloud/cloud_compaction_action.cpp index 0be27c373208d9..2f3f187d5d70d0 100644 --- a/be/src/cloud/cloud_compaction_action.cpp +++ b/be/src/cloud/cloud_compaction_action.cpp @@ -167,10 +167,10 @@ Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::stri << " table id: " << table_id; // 3. submit compaction task RETURN_IF_ERROR(_engine.submit_compaction_task( - tablet, compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION - : compaction_type == PARAM_COMPACTION_CUMULATIVE - ? CompactionType::CUMULATIVE_COMPACTION - : CompactionType::FULL_COMPACTION, + tablet, + compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION + : compaction_type == PARAM_COMPACTION_CUMULATIVE ? CompactionType::CUMULATIVE_COMPACTION + : CompactionType::FULL_COMPACTION, TriggerMethod::MANUAL)); LOG(INFO) << "Manual compaction task is successfully triggered, tablet id: " << tablet_id diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 1fc3da8ccb89f3..7f07ea7f225742 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -40,7 +40,9 @@ class CloudCumulativeCompaction : public CloudCompactionMixin { int64_t get_input_rowsets_bytes() const { return _input_rowsets_total_size; } int64_t get_input_num_rows() const { return _input_row_num; } - CompactionProfileType profile_type() const override { return CompactionProfileType::CUMULATIVE; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::CUMULATIVE; + } private: Status pick_rowsets_to_compact(); diff --git a/be/src/cloud/cloud_index_change_compaction.h b/be/src/cloud/cloud_index_change_compaction.h index aa87b2f4ba03e1..f6e488597c1d9c 100644 --- a/be/src/cloud/cloud_index_change_compaction.h +++ b/be/src/cloud/cloud_index_change_compaction.h @@ -45,7 +45,9 @@ class CloudIndexChangeCompaction : public CloudCompactionMixin { bool is_base_compaction() const { return _compact_type == cloud::TabletCompactionJobPB::BASE; } - CompactionProfileType profile_type() const override { return CompactionProfileType::INDEX_CHANGE; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::INDEX_CHANGE; + } Status rebuild_tablet_schema() override; diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 9d84d5a0354e30..005e332da6cc90 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -56,8 +56,8 @@ #include "io/hdfs_util.h" #include "io/io_common.h" #include "load/memtable/memtable_flush_executor.h" -#include "service/backend_options.h" #include "runtime/memory/cache_manager.h" +#include "service/backend_options.h" #include "storage/compaction/cumulative_compaction_policy.h" #include "storage/compaction/cumulative_compaction_time_series_policy.h" #include "storage/storage_policy.h" @@ -795,7 +795,7 @@ Status CloudStorageEngine::_request_tablet_global_compaction_lock( } Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet, - TriggerMethod trigger_method) { + TriggerMethod trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); @@ -895,7 +895,7 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t } Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet, - TriggerMethod trigger_method) { + TriggerMethod trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); @@ -1085,7 +1085,7 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS } Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet, - TriggerMethod trigger_method) { + TriggerMethod trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 20e90142a2ca98..232ae819d6c5a3 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -134,7 +134,7 @@ class CloudStorageEngine final : public BaseStorageEngine { std::vector>& res); Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type, - TriggerMethod trigger_method = TriggerMethod::BACKGROUND); + TriggerMethod trigger_method = TriggerMethod::BACKGROUND); Status get_compaction_status_json(std::string* result); @@ -201,11 +201,11 @@ class CloudStorageEngine final : public BaseStorageEngine { bool check_score); Status _adjust_compaction_thread_num(); Status _submit_base_compaction_task(const CloudTabletSPtr& tablet, - TriggerMethod trigger_method); + TriggerMethod trigger_method); Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet, - TriggerMethod trigger_method); + TriggerMethod trigger_method); Status _submit_full_compaction_task(const CloudTabletSPtr& tablet, - TriggerMethod trigger_method); + TriggerMethod trigger_method); Status _request_tablet_global_compaction_lock(ReaderType compaction_type, const CloudTabletSPtr& tablet, std::shared_ptr compaction); diff --git a/be/src/information_schema/schema_compaction_tasks_scanner.cpp b/be/src/information_schema/schema_compaction_tasks_scanner.cpp index 57e47ce53bd664..0913abe5ce390c 100644 --- a/be/src/information_schema/schema_compaction_tasks_scanner.cpp +++ b/be/src/information_schema/schema_compaction_tasks_scanner.cpp @@ -158,7 +158,7 @@ Status SchemaCompactionTasksScanner::_fill_block_impl(Block* block) { for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { strs_storage[i - fill_idx_begin] = to_string(_tasks[i].compaction_type); strs[i - fill_idx_begin] = StringRef(strs_storage[i - fill_idx_begin].c_str(), - strs_storage[i - fill_idx_begin].size()); + strs_storage[i - fill_idx_begin].size()); datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; } RETURN_IF_ERROR(fill_dest_column_for_range(block, 5, datas)); @@ -170,7 +170,7 @@ Status SchemaCompactionTasksScanner::_fill_block_impl(Block* block) { for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { strs_storage[i - fill_idx_begin] = to_string(_tasks[i].status); strs[i - fill_idx_begin] = StringRef(strs_storage[i - fill_idx_begin].c_str(), - strs_storage[i - fill_idx_begin].size()); + strs_storage[i - fill_idx_begin].size()); datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; } RETURN_IF_ERROR(fill_dest_column_for_range(block, 6, datas)); @@ -182,7 +182,7 @@ Status SchemaCompactionTasksScanner::_fill_block_impl(Block* block) { for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { strs_storage[i - fill_idx_begin] = to_string(_tasks[i].trigger_method); strs[i - fill_idx_begin] = StringRef(strs_storage[i - fill_idx_begin].c_str(), - strs_storage[i - fill_idx_begin].size()); + strs_storage[i - fill_idx_begin].size()); datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; } RETURN_IF_ERROR(fill_dest_column_for_range(block, 7, datas)); @@ -296,7 +296,7 @@ Status SchemaCompactionTasksScanner::_fill_block_impl(Block* block) { std::vector strs(fill_tasks_num); for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { strs[i - fill_idx_begin] = StringRef(_tasks[i].input_version_range.c_str(), - _tasks[i].input_version_range.size()); + _tasks[i].input_version_range.size()); datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; } RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, datas)); @@ -350,8 +350,8 @@ Status SchemaCompactionTasksScanner::_fill_block_impl(Block* block) { { std::vector strs(fill_tasks_num); for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { - strs[i - fill_idx_begin] = StringRef(_tasks[i].output_version.c_str(), - _tasks[i].output_version.size()); + strs[i - fill_idx_begin] = + StringRef(_tasks[i].output_version.c_str(), _tasks[i].output_version.size()); datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; } RETURN_IF_ERROR(fill_dest_column_for_range(block, 23, datas)); @@ -405,8 +405,8 @@ Status SchemaCompactionTasksScanner::_fill_block_impl(Block* block) { { std::vector strs(fill_tasks_num); for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { - strs[i - fill_idx_begin] = StringRef(_tasks[i].status_msg.c_str(), - _tasks[i].status_msg.size()); + strs[i - fill_idx_begin] = + StringRef(_tasks[i].status_msg.c_str(), _tasks[i].status_msg.size()); datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin; } RETURN_IF_ERROR(fill_dest_column_for_range(block, 29, datas)); diff --git a/be/src/information_schema/schema_scanner.cpp b/be/src/information_schema/schema_scanner.cpp index b56cc0a4480305..5618cba011b41f 100644 --- a/be/src/information_schema/schema_scanner.cpp +++ b/be/src/information_schema/schema_scanner.cpp @@ -49,12 +49,12 @@ #include "information_schema/schema_backend_kerberos_ticket_cache.h" #include "information_schema/schema_catalog_meta_cache_stats_scanner.h" #include "information_schema/schema_charsets_scanner.h" -#include "information_schema/schema_compaction_tasks_scanner.h" #include "information_schema/schema_cluster_snapshot_properties_scanner.h" #include "information_schema/schema_cluster_snapshots_scanner.h" #include "information_schema/schema_collations_scanner.h" #include "information_schema/schema_column_data_sizes_scanner.h" #include "information_schema/schema_columns_scanner.h" +#include "information_schema/schema_compaction_tasks_scanner.h" #include "information_schema/schema_database_properties_scanner.h" #include "information_schema/schema_dummy_scanner.h" #include "information_schema/schema_encryption_keys_scanner.h" diff --git a/be/src/service/http/action/compaction_action.cpp b/be/src/service/http/action/compaction_action.cpp index 40ba4cef48b5d2..48b3b019f1a7bc 100644 --- a/be/src/service/http/action/compaction_action.cpp +++ b/be/src/service/http/action/compaction_action.cpp @@ -157,9 +157,9 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j [table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; }); for (const auto& tablet : tablet_vec) { tablet->set_last_full_compaction_schedule_time(UnixMillis()); - RETURN_IF_ERROR(_engine.submit_compaction_task( - tablet, CompactionType::FULL_COMPACTION, false, /*eager=*/true, - TriggerMethod::MANUAL)); + RETURN_IF_ERROR(_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, + false, /*eager=*/true, + TriggerMethod::MANUAL)); } } else { // 2. fetch the tablet by tablet_id @@ -172,9 +172,9 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j return Status::NotSupported("tablet should do compaction locally"); } DBUG_EXECUTE_IF("CompactionAction._handle_run_compaction.submit_cumu_task", { - RETURN_IF_ERROR(_engine.submit_compaction_task( - tablet, CompactionType::CUMULATIVE_COMPACTION, false, /*eager=*/true, - TriggerMethod::MANUAL)); + RETURN_IF_ERROR( + _engine.submit_compaction_task(tablet, CompactionType::CUMULATIVE_COMPACTION, + false, /*eager=*/true, TriggerMethod::MANUAL)); LOG(INFO) << "Manual debug compaction task is successfully triggered"; *json_result = R"({"status": "Success", "msg": "debug compaction task is successfully triggered. Table id: )" + diff --git a/be/src/storage/compaction/compaction.cpp b/be/src/storage/compaction/compaction.cpp index e6af7a1ea5f132..3570b03b92037d 100644 --- a/be/src/storage/compaction/compaction.cpp +++ b/be/src/storage/compaction/compaction.cpp @@ -17,8 +17,6 @@ #include "storage/compaction/compaction.h" -#include "storage/compaction/compaction_task_tracker.h" - #include #include #include @@ -55,6 +53,7 @@ #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" #include "storage/compaction/collection_statistics.h" +#include "storage/compaction/compaction_task_tracker.h" #include "storage/compaction/cumulative_compaction.h" #include "storage/compaction/cumulative_compaction_policy.h" #include "storage/compaction/cumulative_compaction_time_series_policy.h" @@ -185,9 +184,8 @@ void Compaction::submit_profile_record(bool success, int64_t start_time_ms, CompletionStats stats; // Fill input_version_range from input rowsets if (!_input_rowsets.empty()) { - stats.input_version_range = - fmt::format("[{}-{}]", _input_rowsets.front()->start_version(), - _input_rowsets.back()->end_version()); + stats.input_version_range = fmt::format("[{}-{}]", _input_rowsets.front()->start_version(), + _input_rowsets.back()->end_version()); } stats.end_time_ms = UnixMillis(); stats.merged_rows = _stats.merged_rows; @@ -605,20 +603,20 @@ Status CompactionMixin::execute_compact() { data_dir->disks_compaction_num_increment(-1); }; - HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( - ({ - impl_status = execute_compact_impl(permits); - impl_status; - }), - ([&](const doris::Exception& ex) { - on_failure(ex); - // Use the captured Status message if Exception has no message - std::string msg = ex.what(); - if (msg.empty() && !impl_status.ok()) { - msg = impl_status.to_string(); - } - submit_profile_record(false, profile_start_time_ms, msg); - })); + HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(({ + impl_status = execute_compact_impl(permits); + impl_status; + }), + ([&](const doris::Exception& ex) { + on_failure(ex); + // Use the captured Status message if Exception has no message + std::string msg = ex.what(); + if (msg.empty() && !impl_status.ok()) { + msg = impl_status.to_string(); + } + submit_profile_record(false, profile_start_time_ms, + msg); + })); // Only reached on success _tablet->compaction_count.fetch_add(1, std::memory_order_relaxed); data_dir->disks_compaction_score_increment(-permits); diff --git a/be/src/storage/compaction/compaction.h b/be/src/storage/compaction/compaction.h index 35fc6efec24297..975af4d9f278d9 100644 --- a/be/src/storage/compaction/compaction.h +++ b/be/src/storage/compaction/compaction.h @@ -35,11 +35,11 @@ #include "common/status.h" #include "io/io_common.h" #include "runtime/runtime_profile.h" +#include "storage/compaction/compaction_task_tracker.h" #include "storage/merger.h" #include "storage/olap_common.h" #include "storage/rowid_conversion.h" #include "storage/rowset/pending_rowset_helper.h" -#include "storage/compaction/compaction_task_tracker.h" #include "storage/rowset/rowset_fwd.h" #include "storage/tablet/tablet_fwd.h" diff --git a/be/src/storage/compaction/cumulative_compaction.h b/be/src/storage/compaction/cumulative_compaction.h index 28b71de8c957e3..b4f31f86d635d6 100644 --- a/be/src/storage/compaction/cumulative_compaction.h +++ b/be/src/storage/compaction/cumulative_compaction.h @@ -39,7 +39,9 @@ class CumulativeCompaction final : public CompactionMixin { private: std::string_view compaction_name() const override { return "cumulative compaction"; } - CompactionProfileType profile_type() const override { return CompactionProfileType::CUMULATIVE; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::CUMULATIVE; + } ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } diff --git a/be/src/storage/compaction/single_replica_compaction.h b/be/src/storage/compaction/single_replica_compaction.h index 9f979530731709..fd0e63cd44cef6 100644 --- a/be/src/storage/compaction/single_replica_compaction.h +++ b/be/src/storage/compaction/single_replica_compaction.h @@ -38,7 +38,9 @@ class SingleReplicaCompaction final : public CompactionMixin { Status execute_compact() override; inline CompactionType real_compact_type() const { return _compaction_type; } - CompactionProfileType profile_type() const override { return CompactionProfileType::SINGLE_REPLICA; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::SINGLE_REPLICA; + } protected: std::string_view compaction_name() const override { return "single replica compaction"; } diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index f681889c8f86e8..35903fe7e2af7f 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -49,7 +49,6 @@ #include "agent/utils.h" #include "common/config.h" -#include "service/backend_options.h" #include "common/logging.h" #include "common/metrics/doris_metrics.h" #include "common/metrics/metrics.h" @@ -59,6 +58,7 @@ #include "io/fs/path.h" #include "runtime/memory/cache_manager.h" #include "runtime/memory/global_memory_arbitrator.h" +#include "service/backend_options.h" #include "storage/compaction/cold_data_compaction.h" #include "storage/compaction/compaction_permit_limiter.h" #include "storage/compaction/cumulative_compaction.h" @@ -1145,11 +1145,10 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, << ", max_threads: " << thread_pool->max_threads() << ", min_threads: " << thread_pool->min_threads() << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); - auto status = thread_pool->submit_func( - [=, compaction = std::move(compaction), this]() { - _handle_compaction(std::move(tablet), std::move(compaction), compaction_type, - permits, force); - }); + auto status = thread_pool->submit_func([=, compaction = std::move(compaction), this]() { + _handle_compaction(std::move(tablet), std::move(compaction), compaction_type, permits, + force); + }); if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] { DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( _cumu_compaction_thread_pool->get_queue_size()); @@ -1290,8 +1289,7 @@ void StorageEngine::_handle_compaction(TabletSharedPtr tablet, } Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force, bool eager, - TriggerMethod trigger_method) { + bool force, bool eager, TriggerMethod trigger_method) { if (!eager) { DCHECK(compaction_type == CompactionType::BASE_COMPACTION || compaction_type == CompactionType::CUMULATIVE_COMPACTION); diff --git a/be/test/cloud/cloud_compaction_test.cpp b/be/test/cloud/cloud_compaction_test.cpp index 0981a5a7a12c6f..f5451c300e393e 100644 --- a/be/test/cloud/cloud_compaction_test.cpp +++ b/be/test/cloud/cloud_compaction_test.cpp @@ -240,7 +240,9 @@ class TestableCloudCompaction : public CloudCompactionMixin { std::string_view compaction_name() const override { return "test_compaction"; } - CompactionProfileType profile_type() const override { return CompactionProfileType::CUMULATIVE; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::CUMULATIVE; + } }; TEST_F(CloudCompactionTest, test_set_storage_resource_from_input_rowsets) { diff --git a/be/test/storage/compaction/compaction_task_tracker_test.cpp b/be/test/storage/compaction/compaction_task_tracker_test.cpp index 4f2e1ba008f898..45c2d1c27224fa 100644 --- a/be/test/storage/compaction/compaction_task_tracker_test.cpp +++ b/be/test/storage/compaction/compaction_task_tracker_test.cpp @@ -77,8 +77,7 @@ class CompactionTaskTrackerTest : public testing::Test { } // Helper to find a task by ID in a vector - const CompactionTaskInfo* find_task(const std::vector& tasks, - int64_t id) { + const CompactionTaskInfo* find_task(const std::vector& tasks, int64_t id) { for (const auto& t : tasks) { if (t.compaction_id == id) return &t; } @@ -226,8 +225,8 @@ TEST_F(CompactionTaskTrackerTest, FallbackRecord_NoRegister) { EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); EXPECT_EQ(task->trigger_method, TriggerMethod::BACKGROUND); // default EXPECT_EQ(task->scheduled_time_ms, cs.end_time_ms); // fallback - EXPECT_EQ(task->compaction_score, 0); // not available - EXPECT_EQ(task->merged_rows, 100); // completion stats present + EXPECT_EQ(task->compaction_score, 0); // not available + EXPECT_EQ(task->merged_rows, 100); // completion stats present } TEST_F(CompactionTaskTrackerTest, FallbackRecord_FailWithStatusMsg) {