Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <string>

#include "common/status.h"
#include "io/io_common.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
Expand Down Expand Up @@ -397,10 +398,44 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
std::atomic<int64_t> compaction_count = 0;

CompactionStage compaction_stage = CompactionStage::NOT_SCHEDULED;
std::mutex sample_info_lock;
std::vector<CompactionSampleInfo> sample_infos;
// Separate sample_infos for each compaction type to avoid race condition
// when different types of compaction run concurrently on the same tablet
std::mutex cumu_sample_info_lock;
std::mutex base_sample_info_lock;
std::mutex full_sample_info_lock;
std::vector<CompactionSampleInfo> cumu_sample_infos;
std::vector<CompactionSampleInfo> base_sample_infos;
std::vector<CompactionSampleInfo> full_sample_infos;
Status last_compaction_status = Status::OK();

std::mutex& get_sample_info_lock(ReaderType reader_type) {
switch (reader_type) {
case ReaderType::READER_CUMULATIVE_COMPACTION:
return cumu_sample_info_lock;
case ReaderType::READER_BASE_COMPACTION:
return base_sample_info_lock;
case ReaderType::READER_FULL_COMPACTION:
return full_sample_info_lock;
default:
// For other compaction types, use base_sample_info_lock as default
return base_sample_info_lock;
}
}

std::vector<CompactionSampleInfo>& get_sample_infos(ReaderType reader_type) {
switch (reader_type) {
case ReaderType::READER_CUMULATIVE_COMPACTION:
return cumu_sample_infos;
case ReaderType::READER_BASE_COMPACTION:
return base_sample_infos;
case ReaderType::READER_FULL_COMPACTION:
return full_sample_infos;
default:
// For other compaction types, use base_sample_infos as default
return base_sample_infos;
}
}

// Density ratio for sparse optimization (non_null_cells / total_cells)
// Value range: [0.0, 1.0], smaller value means more sparse
// Default 1.0 means no history data, will not enable sparse optimization initially
Expand Down
33 changes: 19 additions & 14 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,12 @@ Status Merger::vertical_compact_one_group(
return Status::OK();
}

int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) {
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
CompactionSampleInfo info = tablet->sample_infos[group_index];
int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt,
ReaderType reader_type) {
auto& sample_info_lock = tablet->get_sample_info_lock(reader_type);
auto& sample_infos = tablet->get_sample_infos(reader_type);
std::unique_lock<std::mutex> lock(sample_info_lock);
CompactionSampleInfo info = sample_infos[group_index];
if (way_cnt <= 0) {
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " way cnt: " << way_cnt;
Expand All @@ -431,12 +434,12 @@ int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_
group_data_size =
int64_t((cast_set<double>(info.group_data_size) * (1 - smoothing_factor)) +
(cast_set<double>(info.bytes / info.rows) * smoothing_factor));
tablet->sample_infos[group_index].group_data_size = group_data_size;
sample_infos[group_index].group_data_size = group_data_size;
} else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) {
group_data_size = info.group_data_size;
} else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
group_data_size = info.bytes / info.rows;
tablet->sample_infos[group_index].group_data_size = group_data_size;
sample_infos[group_index].group_data_size = group_data_size;
} else {
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " group data size: " << info.group_data_size
Expand All @@ -450,8 +453,8 @@ int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_
return 4096 - 32;
}

tablet->sample_infos[group_index].bytes = 0;
tablet->sample_infos[group_index].rows = 0;
sample_infos[group_index].bytes = 0;
sample_infos[group_index].rows = 0;

int64_t batch_size = block_mem_limit / group_data_size;
int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L));
Expand Down Expand Up @@ -509,17 +512,19 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
if (stats_output != nullptr) {
total_stats.rowid_conversion = stats_output->rowid_conversion;
}
auto& sample_info_lock = tablet->get_sample_info_lock(reader_type);
auto& sample_infos = tablet->get_sample_infos(reader_type);
{
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
tablet->sample_infos.resize(column_groups.size());
std::unique_lock<std::mutex> lock(sample_info_lock);
sample_infos.resize(column_groups.size());
}
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
int64_t batch_size = config::compaction_batch_size != -1
? config::compaction_batch_size
: estimate_batch_size(i, tablet, merge_way_num);
: estimate_batch_size(i, tablet, merge_way_num, reader_type);
CompactionSampleInfo sample_info;
Merger::Statistics group_stats;
group_stats.rowid_conversion = total_stats.rowid_conversion;
Expand All @@ -529,8 +534,8 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, group_stats_ptr,
key_group_cluster_key_idxes, batch_size, &sample_info, enable_sparse_optimization);
{
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
tablet->sample_infos[i] = sample_info;
std::unique_lock<std::mutex> lock(sample_info_lock);
sample_infos[i] = sample_info;
}
RETURN_IF_ERROR(st);
if (stats_output != nullptr) {
Expand All @@ -556,9 +561,9 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
// density = (total_cells - total_null_count) / total_cells
// Smaller density means more sparse
{
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
std::unique_lock<std::mutex> lock(sample_info_lock);
int64_t total_null_count = 0;
for (const auto& info : tablet->sample_infos) {
for (const auto& info : sample_infos) {
total_null_count += info.null_count;
}
int64_t total_cells = total_rows * tablet_schema.num_columns();
Expand Down
Loading
Loading