Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,11 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest&
tablet_meta_info.time_series_compaction_level_threshold);
need_to_save = true;
}
if (tablet_meta_info.__isset.vertical_compaction_num_columns_per_group) {
tablet->tablet_meta()->set_vertical_compaction_num_columns_per_group(
tablet_meta_info.vertical_compaction_num_columns_per_group);
need_to_save = true;
}
if (tablet_meta_info.__isset.replica_id) {
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,14 @@ Status CloudTablet::sync_meta() {
_tablet_meta->mutable_tablet_schema()->set_disable_auto_compaction(
new_disable_auto_compaction);
}
// Sync vertical_compaction_num_columns_per_group
auto new_vertical_compaction_num_columns_per_group =
tablet_meta->vertical_compaction_num_columns_per_group();
if (_tablet_meta->vertical_compaction_num_columns_per_group() !=
new_vertical_compaction_num_columns_per_group) {
_tablet_meta->set_vertical_compaction_num_columns_per_group(
new_vertical_compaction_num_columns_per_group);
}

return Status::OK();
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,8 @@ void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out, const TabletMetaPB& in)
out->set_time_series_compaction_empty_rowsets_threshold(
in.time_series_compaction_empty_rowsets_threshold());
out->set_time_series_compaction_level_threshold(in.time_series_compaction_level_threshold());
out->set_vertical_compaction_num_columns_per_group(
in.vertical_compaction_num_columns_per_group());
out->set_index_id(in.index_id());
out->set_is_in_memory(in.is_in_memory());
out->set_is_persistent(in.is_persistent());
Expand Down Expand Up @@ -712,6 +714,8 @@ void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out, TabletMetaPB&& in) {
out->set_time_series_compaction_empty_rowsets_threshold(
in.time_series_compaction_empty_rowsets_threshold());
out->set_time_series_compaction_level_threshold(in.time_series_compaction_level_threshold());
out->set_vertical_compaction_num_columns_per_group(
in.vertical_compaction_num_columns_per_group());
out->set_index_id(in.index_id());
out->set_is_in_memory(in.is_in_memory());
out->set_is_persistent(in.is_persistent());
Expand Down Expand Up @@ -796,6 +800,8 @@ void cloud_tablet_meta_to_doris(TabletMetaPB* out, const TabletMetaCloudPB& in)
out->set_time_series_compaction_empty_rowsets_threshold(
in.time_series_compaction_empty_rowsets_threshold());
out->set_time_series_compaction_level_threshold(in.time_series_compaction_level_threshold());
out->set_vertical_compaction_num_columns_per_group(
in.vertical_compaction_num_columns_per_group());
out->set_index_id(in.index_id());
out->set_is_in_memory(in.is_in_memory());
out->set_is_persistent(in.is_persistent());
Expand Down Expand Up @@ -873,6 +879,8 @@ void cloud_tablet_meta_to_doris(TabletMetaPB* out, TabletMetaCloudPB&& in) {
out->set_time_series_compaction_empty_rowsets_threshold(
in.time_series_compaction_empty_rowsets_threshold());
out->set_time_series_compaction_level_threshold(in.time_series_compaction_level_threshold());
out->set_vertical_compaction_num_columns_per_group(
in.vertical_compaction_num_columns_per_group());
out->set_index_id(in.index_id());
out->set_is_in_memory(in.is_in_memory());
out->set_is_persistent(in.is_persistent());
Expand Down
34 changes: 30 additions & 4 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
// unique_key should consider sequence&delete column
void Merger::vertical_split_columns(const TabletSchema& tablet_schema,
std::vector<std::vector<uint32_t>>* column_groups,
std::vector<uint32_t>* key_group_cluster_key_idxes) {
std::vector<uint32_t>* key_group_cluster_key_idxes,
int32_t num_columns_per_group) {
size_t num_key_cols = tablet_schema.num_key_columns();
size_t total_cols = tablet_schema.num_columns();
std::vector<uint32_t> key_columns;
Expand Down Expand Up @@ -227,8 +228,7 @@ void Merger::vertical_split_columns(const TabletSchema& tablet_schema,
continue;
}

if (!value_columns.empty() &&
value_columns.size() % config::vertical_compaction_num_columns_per_group == 0) {
if (!value_columns.empty() && value_columns.size() % num_columns_per_group == 0) {
column_groups->push_back(value_columns);
value_columns.clear();
}
Expand Down Expand Up @@ -479,7 +479,33 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
std::vector<uint32_t> key_group_cluster_key_idxes;
vertical_split_columns(tablet_schema, &column_groups, &key_group_cluster_key_idxes);
// If BE config vertical_compaction_num_columns_per_group has been modified from
// its default value (5), use the BE config; otherwise use the tablet meta value.
constexpr int32_t default_num_columns_per_group = 5;
int32_t num_columns_per_group =
config::vertical_compaction_num_columns_per_group != default_num_columns_per_group
? config::vertical_compaction_num_columns_per_group
: tablet->tablet_meta()->vertical_compaction_num_columns_per_group();

DBUG_EXECUTE_IF("Merger.vertical_merge_rowsets.check_num_columns_per_group", {
auto expected_value = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"Merger.vertical_merge_rowsets.check_num_columns_per_group", "expected_value", -1);
auto expected_tablet_id = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
"Merger.vertical_merge_rowsets.check_num_columns_per_group", "tablet_id", -1);
if (expected_tablet_id != -1 && expected_tablet_id == tablet->tablet_id()) {
if (expected_value != -1 && expected_value != num_columns_per_group) {
LOG(FATAL) << "DEBUG_POINT CHECK FAILED: expected num_columns_per_group="
<< expected_value << " but got " << num_columns_per_group
<< " for tablet_id=" << tablet->tablet_id();
} else {
LOG(INFO) << "DEBUG_POINT CHECK PASSED: num_columns_per_group="
<< num_columns_per_group << ", tablet_id=" << tablet->tablet_id();
}
}
});

vertical_split_columns(tablet_schema, &column_groups, &key_group_cluster_key_idxes,
num_columns_per_group);

// Calculate total rows for density calculation after compaction
int64_t total_rows = 0;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class Merger {
// for vertical compaction
static void vertical_split_columns(const TabletSchema& tablet_schema,
std::vector<std::vector<uint32_t>>* column_groups,
std::vector<uint32_t>* key_group_cluster_key_idxes);
std::vector<uint32_t>* key_group_cluster_key_idxes,
int32_t num_columns_per_group);
static Status vertical_compact_one_group(
BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema,
bool is_key, const std::vector<uint32_t>& column_group,
Expand Down
12 changes: 10 additions & 2 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "absl/strings/substitute.h"
#include "beta_rowset_writer.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
Expand Down Expand Up @@ -281,8 +282,15 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt

std::vector<std::vector<uint32_t>> column_groups;
std::vector<uint32_t> key_group_cluster_key_idxes;
Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups,
&key_group_cluster_key_idxes);
// If BE config vertical_compaction_num_columns_per_group has been modified from
// its default value (5), use the BE config; otherwise use the tablet meta value.
constexpr int32_t default_num_columns_per_group = 5;
int32_t num_columns_per_group =
config::vertical_compaction_num_columns_per_group != default_num_columns_per_group
? config::vertical_compaction_num_columns_per_group
: tablet->tablet_meta()->vertical_compaction_num_columns_per_group();
Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups, &key_group_cluster_key_idxes,
num_columns_per_group);
vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),
ReaderType::READER_SEGMENT_COMPACTION);

Expand Down
18 changes: 15 additions & 3 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ TabletMetaSharedPtr TabletMeta::create(
request.time_series_compaction_time_threshold_seconds,
request.time_series_compaction_empty_rowsets_threshold,
request.time_series_compaction_level_threshold, inverted_index_file_storage_format,
request.tde_algorithm, storage_format);
request.tde_algorithm, storage_format,
request.__isset.vertical_compaction_num_columns_per_group
? request.vertical_compaction_num_columns_per_group
: 5);
}

TabletMeta::~TabletMeta() {
Expand Down Expand Up @@ -154,7 +157,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
int64_t time_series_compaction_level_threshold,
TInvertedIndexFileStorageFormat::type inverted_index_file_storage_format,
TEncryptionAlgorithm::type tde_algorithm,
TStorageFormat::type storage_format)
TStorageFormat::type storage_format,
int32_t vertical_compaction_num_columns_per_group)
: _tablet_uid(0, 0),
_schema(new TabletSchema),
_delete_bitmap(new DeleteBitmap(tablet_id)),
Expand Down Expand Up @@ -187,6 +191,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
time_series_compaction_empty_rowsets_threshold);
tablet_meta_pb.set_time_series_compaction_level_threshold(
time_series_compaction_level_threshold);
tablet_meta_pb.set_vertical_compaction_num_columns_per_group(
vertical_compaction_num_columns_per_group);
TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
Expand Down Expand Up @@ -454,7 +460,9 @@ TabletMeta::TabletMeta(const TabletMeta& b)
b._time_series_compaction_time_threshold_seconds),
_time_series_compaction_empty_rowsets_threshold(
b._time_series_compaction_empty_rowsets_threshold),
_time_series_compaction_level_threshold(b._time_series_compaction_level_threshold) {};
_time_series_compaction_level_threshold(b._time_series_compaction_level_threshold),
_vertical_compaction_num_columns_per_group(
b._vertical_compaction_num_columns_per_group) {};

void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn,
ColumnPB* column) {
Expand Down Expand Up @@ -847,6 +855,8 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
tablet_meta_pb.time_series_compaction_empty_rowsets_threshold();
_time_series_compaction_level_threshold =
tablet_meta_pb.time_series_compaction_level_threshold();
_vertical_compaction_num_columns_per_group =
tablet_meta_pb.vertical_compaction_num_columns_per_group();

if (tablet_meta_pb.has_encryption_algorithm()) {
_encryption_algorithm = tablet_meta_pb.encryption_algorithm();
Expand Down Expand Up @@ -942,6 +952,8 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb, bool cloud_get_rowset_
time_series_compaction_empty_rowsets_threshold());
tablet_meta_pb->set_time_series_compaction_level_threshold(
time_series_compaction_level_threshold());
tablet_meta_pb->set_vertical_compaction_num_columns_per_group(
vertical_compaction_num_columns_per_group());

tablet_meta_pb->set_encryption_algorithm(_encryption_algorithm);
}
Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class TabletMeta : public MetadataAdder<TabletMeta> {
TInvertedIndexFileStorageFormat::type inverted_index_file_storage_format =
TInvertedIndexFileStorageFormat::V2,
TEncryptionAlgorithm::type tde_algorithm = TEncryptionAlgorithm::PLAINTEXT,
TStorageFormat::type storage_format = TStorageFormat::V2);
TStorageFormat::type storage_format = TStorageFormat::V2,
int32_t vertical_compaction_num_columns_per_group = 5);
// If need add a filed in TableMeta, filed init copy in copy construct function
TabletMeta(const TabletMeta& tablet_meta);
TabletMeta(TabletMeta&& tablet_meta) = delete;
Expand Down Expand Up @@ -296,6 +297,13 @@ class TabletMeta : public MetadataAdder<TabletMeta> {
return _time_series_compaction_level_threshold;
}

void set_vertical_compaction_num_columns_per_group(int32_t num) {
_vertical_compaction_num_columns_per_group = num;
}
int32_t vertical_compaction_num_columns_per_group() const {
return _vertical_compaction_num_columns_per_group;
}

int64_t ttl_seconds() const {
std::shared_lock rlock(_meta_lock);
return _ttl_seconds;
Expand Down Expand Up @@ -366,6 +374,7 @@ class TabletMeta : public MetadataAdder<TabletMeta> {
int64_t _time_series_compaction_time_threshold_seconds = 0;
int64_t _time_series_compaction_empty_rowsets_threshold = 0;
int64_t _time_series_compaction_level_threshold = 0;
int32_t _vertical_compaction_num_columns_per_group = 5;

int64_t _avg_rs_meta_serialize_size = 0;

Expand Down
3 changes: 3 additions & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,9 @@ void MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
}
}
}
} else if (tablet_meta_info.has_vertical_compaction_num_columns_per_group()) {
tablet_meta.set_vertical_compaction_num_columns_per_group(
tablet_meta_info.vertical_compaction_num_columns_per_group());
}
int64_t table_id = tablet_meta.table_id();
int64_t index_id = tablet_meta.index_id();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ private void createRollupReplicaForPartition(OlapTable tbl) throws Exception {
tbl.rowStorePageSize(),
tbl.variantEnableFlattenNested(), null,
tbl.storagePageSize(), tbl.getTDEAlgorithmPB(),
tbl.storageDictPageSize(), true);
tbl.storageDictPageSize(), true,
tbl.getVerticalCompactionNumColumnsPerGroup());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
requestBuilder.setDbId(dbId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ private void createShadowIndexReplicaForPartition(OlapTable tbl) throws Exceptio
tbl.rowStorePageSize(),
tbl.variantEnableFlattenNested(), clusterKeyUids,
tbl.storagePageSize(), tbl.getTDEAlgorithmPB(),
tbl.storageDictPageSize(), true);
tbl.storageDictPageSize(), true,
tbl.getVerticalCompactionNumColumnsPerGroup());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
requestBuilder.setDbId(dbId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ protected void createRollupReplica() throws AlterCancelException {
tbl.rowStorePageSize(),
tbl.variantEnableFlattenNested(),
tbl.storagePageSize(), tbl.getTDEAlgorithm(),
tbl.storageDictPageSize());
tbl.storageDictPageSize(),
tbl.getVerticalCompactionNumColumnsPerGroup());
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
Expand Down
Loading
Loading