Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class CloudBaseCompaction : public CloudCompactionMixin {
Status request_global_lock();

void do_lease();
CompactionProfileType profile_type() const override { return CompactionProfileType::BASE; }

private:
Status pick_rowsets_to_compact();

std::string_view compaction_name() const override { return "CloudBaseCompaction"; }
int64_t input_segments_num() const override { return _input_segments; }

Status modify_rowsets() override;

Expand Down
10 changes: 5 additions & 5 deletions be/src/cloud/cloud_compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

#include "absl/strings/substitute.h"
#include "cloud/cloud_base_compaction.h"
#include "cloud/cloud_compaction_action.h"
#include "cloud/cloud_cumulative_compaction.h"
#include "cloud/cloud_full_compaction.h"
#include "cloud/cloud_tablet.h"
Expand Down Expand Up @@ -168,10 +167,11 @@ Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::stri
<< " table id: " << table_id;
// 3. submit compaction task
RETURN_IF_ERROR(_engine.submit_compaction_task(
tablet, compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION
: compaction_type == PARAM_COMPACTION_CUMULATIVE
? CompactionType::CUMULATIVE_COMPACTION
: CompactionType::FULL_COMPACTION));
tablet,
compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION
: compaction_type == PARAM_COMPACTION_CUMULATIVE ? CompactionType::CUMULATIVE_COMPACTION
: CompactionType::FULL_COMPACTION,
TriggerMethod::MANUAL));

LOG(INFO) << "Manual compaction task is successfully triggered, tablet id: " << tablet_id
<< " table id: " << table_id;
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {

int64_t get_input_rowsets_bytes() const { return _input_rowsets_total_size; }
int64_t get_input_num_rows() const { return _input_row_num; }
CompactionProfileType profile_type() const override {
return CompactionProfileType::CUMULATIVE;
}

private:
Status pick_rowsets_to_compact();

std::string_view compaction_name() const override { return "CloudCumulativeCompaction"; }
int64_t input_segments_num() const override { return _input_segments; }

Status modify_rowsets() override;

Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_full_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ class CloudFullCompaction : public CloudCompactionMixin {
Status request_global_lock();

void do_lease();
CompactionProfileType profile_type() const override { return CompactionProfileType::FULL; }

protected:
Status pick_rowsets_to_compact();

std::string_view compaction_name() const override { return "CloudFullCompaction"; }
int64_t input_segments_num() const override { return _input_segments; }

Status modify_rowsets() override;
Status garbage_collection() override;
Expand Down
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_index_change_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class CloudIndexChangeCompaction : public CloudCompactionMixin {

bool is_base_compaction() const { return _compact_type == cloud::TabletCompactionJobPB::BASE; }

CompactionProfileType profile_type() const override {
return CompactionProfileType::INDEX_CHANGE;
}

Status rebuild_tablet_schema() override;

private:
Expand All @@ -55,6 +59,7 @@ class CloudIndexChangeCompaction : public CloudCompactionMixin {

protected:
std::string_view compaction_name() const override { return "CloudIndexChangeCompaction"; }
int64_t input_segments_num() const override { return _input_segments; }

// if cumu rowset is modified, cumu compaction should sync rowset before execute.
// if base rowset is modified, base compaction should sync rowset before execute.
Expand Down
130 changes: 120 additions & 10 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "io/io_common.h"
#include "load/memtable/memtable_flush_executor.h"
#include "runtime/memory/cache_manager.h"
#include "service/backend_options.h"
#include "storage/compaction/cumulative_compaction_policy.h"
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
#include "storage/storage_policy.h"
Expand Down Expand Up @@ -793,7 +794,8 @@ Status CloudStorageEngine::_request_tablet_global_compaction_lock(
}
}

Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) {
Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet,
TriggerMethod trigger_method) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
Expand All @@ -816,6 +818,29 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
_submitted_base_compactions.erase(tablet->tablet_id());
return st;
}

// Register task as PENDING in the tracker after successful prepare
int64_t compaction_id = compaction->compaction_id();
{
CompactionTaskInfo info;
info.compaction_id = compaction_id;
info.backend_id = BackendOptions::get_backend_id();
info.table_id = tablet->table_id();
info.partition_id = tablet->partition_id();
info.tablet_id = tablet->tablet_id();
info.compaction_type = compaction->profile_type();
info.status = CompactionTaskStatus::PENDING;
info.trigger_method = trigger_method;
info.compaction_score = tablet->get_cloud_base_compaction_score();
info.scheduled_time_ms = UnixMillis();
info.input_rowsets_count = compaction->input_rowsets_count();
info.input_row_num = compaction->input_row_num();
info.input_data_size = compaction->input_rowsets_data_size();
info.input_segments_num = compaction->input_segments_num_value();
info.input_version_range = compaction->input_version_range_str();
CompactionTaskTracker::instance()->register_task(std::move(info));
}

{
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions[tablet->tablet_id()] = compaction;
Expand All @@ -836,13 +861,24 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
}};
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet,
compaction);
if (!st.ok()) return;
if (!st.ok()) {
CompactionTaskTracker::instance()->remove_task(compaction_id);
return;
}
// Update tracker to RUNNING when execution starts
{
RunningStats stats;
stats.start_time_ms = UnixMillis();
stats.is_vertical = compaction->is_vertical();
CompactionTaskTracker::instance()->update_to_running(compaction_id, stats);
}
st = compaction->execute_compact();
if (!st.ok()) {
// Error log has been output in `execute_compact`
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->set_last_base_compaction_failure_time(now);
}
// submit_profile_record() inside execute_compact() handles complete/fail in tracker
std::lock_guard lock(_compaction_mtx);
_executing_base_compactions.erase(tablet->tablet_id());
});
Expand All @@ -851,13 +887,15 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
if (!st.ok()) {
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions.erase(tablet->tablet_id());
CompactionTaskTracker::instance()->remove_task(compaction_id);
return Status::InternalError("failed to submit base compaction, tablet_id={}",
tablet->tablet_id());
}
return st;
}

Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) {
Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet,
TriggerMethod trigger_method) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
Expand Down Expand Up @@ -890,6 +928,29 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
_tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
return st;
}

// Register task as PENDING in the tracker after successful prepare
int64_t compaction_id = compaction->compaction_id();
{
CompactionTaskInfo info;
info.compaction_id = compaction_id;
info.backend_id = BackendOptions::get_backend_id();
info.table_id = tablet->table_id();
info.partition_id = tablet->partition_id();
info.tablet_id = tablet->tablet_id();
info.compaction_type = compaction->profile_type();
info.status = CompactionTaskStatus::PENDING;
info.trigger_method = trigger_method;
info.compaction_score = tablet->get_cloud_cumu_compaction_score();
info.scheduled_time_ms = UnixMillis();
info.input_rowsets_count = compaction->input_rowsets_count();
info.input_row_num = compaction->input_row_num();
info.input_data_size = compaction->input_rowsets_data_size();
info.input_segments_num = compaction->input_segments_num_value();
info.input_version_range = compaction->input_version_range_str();
CompactionTaskTracker::instance()->register_task(std::move(info));
}

{
std::lock_guard lock(_compaction_mtx);
_tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
Expand Down Expand Up @@ -952,7 +1013,17 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
}};
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION,
tablet, compaction);
if (!st.ok()) return;
if (!st.ok()) {
CompactionTaskTracker::instance()->remove_task(compaction_id);
return;
}
// Update tracker to RUNNING when execution starts
{
RunningStats stats;
stats.start_time_ms = UnixMillis();
stats.is_vertical = compaction->is_vertical();
CompactionTaskTracker::instance()->update_to_running(compaction_id, stats);
}
do {
std::lock_guard lock(_cumu_compaction_delay_mtx);
_cumu_compaction_thread_pool_used_threads++;
Expand Down Expand Up @@ -989,6 +1060,7 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
.tag("remaining threads", _cumu_compaction_thread_pool_used_threads)
.tag("small_tasks_running",
_cumu_compaction_thread_pool_small_tasks_running);
CompactionTaskTracker::instance()->remove_task(compaction_id);
return;
}
}
Expand All @@ -1005,13 +1077,15 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
_cumu_compaction_thread_pool->get_queue_size());
if (!st.ok()) {
erase_submitted_cumu_compaction();
CompactionTaskTracker::instance()->remove_task(compaction_id);
return Status::InternalError("failed to submit cumu compaction, tablet_id={}",
tablet->tablet_id());
}
return st;
}

Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) {
Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet,
TriggerMethod trigger_method) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
Expand All @@ -1033,6 +1107,29 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t
_submitted_full_compactions.erase(tablet->tablet_id());
return st;
}

// Register task as PENDING in the tracker after successful prepare
int64_t compaction_id = compaction->compaction_id();
{
CompactionTaskInfo info;
info.compaction_id = compaction_id;
info.backend_id = BackendOptions::get_backend_id();
info.table_id = tablet->table_id();
info.partition_id = tablet->partition_id();
info.tablet_id = tablet->tablet_id();
info.compaction_type = compaction->profile_type();
info.status = CompactionTaskStatus::PENDING;
info.trigger_method = trigger_method;
info.compaction_score = tablet->get_cloud_base_compaction_score();
info.scheduled_time_ms = UnixMillis();
info.input_rowsets_count = compaction->input_rowsets_count();
info.input_row_num = compaction->input_row_num();
info.input_data_size = compaction->input_rowsets_data_size();
info.input_segments_num = compaction->input_segments_num_value();
info.input_version_range = compaction->input_version_range_str();
CompactionTaskTracker::instance()->register_task(std::move(info));
}

{
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions[tablet->tablet_id()] = compaction;
Expand All @@ -1047,39 +1144,52 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t
}};
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet,
compaction);
if (!st.ok()) return;
if (!st.ok()) {
CompactionTaskTracker::instance()->remove_task(compaction_id);
return;
}
// Update tracker to RUNNING when execution starts
{
RunningStats stats;
stats.start_time_ms = UnixMillis();
stats.is_vertical = compaction->is_vertical();
CompactionTaskTracker::instance()->update_to_running(compaction_id, stats);
}
st = compaction->execute_compact();
if (!st.ok()) {
// Error log has been output in `execute_compact`
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->set_last_full_compaction_failure_time(now);
}
// submit_profile_record() inside execute_compact() handles complete/fail in tracker
std::lock_guard lock(_compaction_mtx);
_executing_full_compactions.erase(tablet->tablet_id());
});
if (!st.ok()) {
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions.erase(tablet->tablet_id());
CompactionTaskTracker::instance()->remove_task(compaction_id);
return Status::InternalError("failed to submit full compaction, tablet_id={}",
tablet->tablet_id());
}
return st;
}

Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet,
CompactionType compaction_type) {
CompactionType compaction_type,
TriggerMethod trigger_method) {
DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION ||
compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::FULL_COMPACTION);
switch (compaction_type) {
case CompactionType::BASE_COMPACTION:
RETURN_IF_ERROR(_submit_base_compaction_task(tablet));
RETURN_IF_ERROR(_submit_base_compaction_task(tablet, trigger_method));
return Status::OK();
case CompactionType::CUMULATIVE_COMPACTION:
RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet));
RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet, trigger_method));
return Status::OK();
case CompactionType::FULL_COMPACTION:
RETURN_IF_ERROR(_submit_full_compaction_task(tablet));
RETURN_IF_ERROR(_submit_full_compaction_task(tablet, trigger_method));
return Status::OK();
default:
return Status::InternalError("unknown compaction type!");
Expand Down
12 changes: 8 additions & 4 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class CloudStorageEngine final : public BaseStorageEngine {
void get_cumu_compaction(int64_t tablet_id,
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res);

Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type);
Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type,
TriggerMethod trigger_method = TriggerMethod::BACKGROUND);

Status get_compaction_status_json(std::string* result);

Expand Down Expand Up @@ -199,9 +200,12 @@ class CloudStorageEngine final : public BaseStorageEngine {
std::vector<CloudTabletSPtr> _generate_cloud_compaction_tasks(CompactionType compaction_type,
bool check_score);
Status _adjust_compaction_thread_num();
Status _submit_base_compaction_task(const CloudTabletSPtr& tablet);
Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet);
Status _submit_full_compaction_task(const CloudTabletSPtr& tablet);
Status _submit_base_compaction_task(const CloudTabletSPtr& tablet,
TriggerMethod trigger_method);
Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet,
TriggerMethod trigger_method);
Status _submit_full_compaction_task(const CloudTabletSPtr& tablet,
TriggerMethod trigger_method);
Status _request_tablet_global_compaction_lock(ReaderType compaction_type,
const CloudTabletSPtr& tablet,
std::shared_ptr<CloudCompactionMixin> compaction);
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ DEFINE_Bool(enable_low_cardinality_cache_code, "true");
DEFINE_mBool(enable_compaction_checksum, "false");
// whether disable automatic compaction task
DEFINE_mBool(disable_auto_compaction, "false");
// max number of compaction task records to keep in memory, 0 to disable
DEFINE_mInt32(compaction_profile_max_records, "500");
// whether enable vertical compaction
DEFINE_mBool(enable_vertical_compaction, "true");
// whether enable ordered data compaction
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ DECLARE_Bool(enable_low_cardinality_cache_code);
DECLARE_mBool(enable_compaction_checksum);
// whether disable automatic compaction task
DECLARE_mBool(disable_auto_compaction);
// max number of compaction task records to keep in memory, 0 to disable
DECLARE_mInt32(compaction_profile_max_records);
// whether enable vertical compaction
DECLARE_mBool(enable_vertical_compaction);
// whether enable ordered data compaction
Expand Down
Loading
Loading