Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2035,8 +2035,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&

std::set<TTabletId> error_tablet_ids;
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
// partition_id, tablet_id, publish_version, commit_tso
std::vector<DiscontinuousVersionTablet> discontinuous_version_tablets;
std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows;
uint32_t retry_time = 0;
Status status;
Expand Down Expand Up @@ -2093,8 +2093,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
}

for (auto& item : discontinuous_version_tablets) {
_engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item),
publish_version_req.transaction_id, false);
_engine.add_async_publish_task(item.partition_id, item.tablet_id, item.publish_version,
publish_version_req.transaction_id, false, item.commit_tso);
}
TFinishTaskRequest finish_task_request;
if (!status.ok()) [[unlikely]] {
Expand Down
12 changes: 12 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
if (in.has_job_id()) {
out->set_job_id(in.job_id());
}
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
}
}

void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
Expand Down Expand Up @@ -192,6 +195,9 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
if (in.has_job_id()) {
out->set_job_id(in.job_id());
}
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
}
}

RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) {
Expand Down Expand Up @@ -281,6 +287,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
if (in.has_job_id()) {
out->set_job_id(in.job_id());
}
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
}
}

void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
Expand Down Expand Up @@ -359,6 +368,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
if (in.has_job_id()) {
out->set_job_id(in.job_id());
}
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
}
}

TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) {
Expand Down
11 changes: 11 additions & 0 deletions be/src/information_schema/schema_rowsets_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = {
{"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true},
{"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true},
{"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true},
{"COMMIT_TSO", TYPE_BIGINT, sizeof(int64_t), true},

};

Expand Down Expand Up @@ -268,6 +269,16 @@ Status SchemaRowsetsScanner::_fill_block_impl(Block* block) {
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
}
// COMMIT_TSO
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->commit_tso();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas));
}

_rowsets_idx += fill_rowsets_num;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/http/action/pad_rowset_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Status PadRowsetAction::_pad_rowset(Tablet* tablet, const Version& version) {
auto writer = DORIS_TRY(tablet->create_rowset_writer(ctx, false));
RowsetSharedPtr rowset;
RETURN_IF_ERROR(writer->build(rowset));
rowset->make_visible(version);
rowset->make_visible(version, -1);

std::vector<RowsetSharedPtr> to_add {rowset};
std::vector<RowsetSharedPtr> to_delete;
Expand Down
29 changes: 15 additions & 14 deletions be/src/storage/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,20 +483,21 @@ Status DataDir::load() {
}
}

auto load_pending_publish_info_func =
[&engine = _engine](int64_t tablet_id, int64_t publish_version, std::string_view info) {
PendingPublishInfoPB pending_publish_info_pb;
bool parsed = pending_publish_info_pb.ParseFromArray(info.data(),
cast_set<int>(info.size()));
if (!parsed) {
LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
<< " publish_version: " << publish_version;
}
engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
publish_version,
pending_publish_info_pb.transaction_id(), true);
return true;
};
auto load_pending_publish_info_func = [&engine = _engine](int64_t tablet_id,
int64_t publish_version,
std::string_view info) {
PendingPublishInfoPB pending_publish_info_pb;
bool parsed =
pending_publish_info_pb.ParseFromArray(info.data(), cast_set<int>(info.size()));
if (!parsed) {
LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
<< " publish_version: " << publish_version;
}
engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
publish_version, pending_publish_info_pb.transaction_id(),
true, pending_publish_info_pb.commit_tso());
return true;
};
MonotonicStopWatch pending_publish_timer;
pending_publish_timer.start();
RETURN_IF_ERROR(
Expand Down
12 changes: 7 additions & 5 deletions be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,7 @@ void StorageEngine::_follow_cooldown_meta(TabletSharedPtr t) {

void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
int64_t publish_version, int64_t transaction_id,
bool is_recovery) {
bool is_recovery, int64_t commit_tso) {
if (!is_recovery) {
bool exists = false;
{
Expand All @@ -1685,6 +1685,7 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
PendingPublishInfoPB pending_publish_info_pb;
pending_publish_info_pb.set_partition_id(partition_id);
pending_publish_info_pb.set_transaction_id(transaction_id);
pending_publish_info_pb.set_commit_tso(commit_tso);
static_cast<void>(TabletMetaManager::save_pending_publish_info(
tablet->data_dir(), tablet->tablet_id(), publish_version,
pending_publish_info_pb.SerializeAsString()));
Expand All @@ -1693,7 +1694,7 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
<< " version: " << publish_version << " txn_id:" << transaction_id
<< " is_recovery: " << is_recovery;
std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
_async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
_async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id, commit_tso};
}

int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
Expand Down Expand Up @@ -1730,8 +1731,9 @@ void StorageEngine::_process_async_publish() {

auto task_iter = tablet_iter->second.begin();
int64_t version = task_iter->first;
int64_t transaction_id = task_iter->second.first;
int64_t partition_id = task_iter->second.second;
int64_t transaction_id = std::get<0>(task_iter->second);
int64_t partition_id = std::get<1>(task_iter->second);
int64_t commit_tso = std::get<2>(task_iter->second);
int64_t max_version = tablet->max_version().second;

if (version <= max_version) {
Expand All @@ -1753,7 +1755,7 @@ void StorageEngine::_process_async_publish() {
}

auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
*this, tablet, partition_id, transaction_id, version);
*this, tablet, partition_id, transaction_id, version, commit_tso);
static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
[=]() { async_publish_task->handle(); }));
tablet_iter->second.erase(task_iter);
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Status Rowset::load(bool use_cache) {
return Status::OK();
}

void Rowset::make_visible(Version version) {
void Rowset::make_visible(Version version, int64_t commit_tso) {
_is_pending = false;
_rowset_meta->set_version(version);
_rowset_meta->set_rowset_state(VISIBLE);
Expand All @@ -95,6 +95,7 @@ void Rowset::make_visible(Version version) {
if (_rowset_meta->has_delete_predicate()) {
_rowset_meta->mutable_delete_predicate()->set_version(cast_set<int32_t>(version.first));
}
_rowset_meta->set_commit_tso(commit_tso);
}

void Rowset::set_version(Version version) {
Expand Down
5 changes: 4 additions & 1 deletion be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
const std::string& tablet_path() const { return _tablet_path; }

// publish rowset to make it visible to read
void make_visible(Version version);
void make_visible(Version version, int64_t commit_tso);
void set_version(Version version);
const TabletSchemaSPtr& tablet_schema() const;

Expand All @@ -166,6 +166,9 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); }
// The writing time of the newest data in rowset, to measure the freshness of a rowset.
int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
// The commit tso of the newest data in rowset.
int64_t commit_tso() const { return rowset_meta()->commit_tso(); }

bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
[algorithm]() -> Result<EncryptionAlgorithmPB> { return algorithm; });
}

int64_t commit_tso() const { return _rowset_meta_pb.commit_tso(); }

void set_commit_tso(int64_t commit_tso) { _rowset_meta_pb.set_commit_tso(commit_tso); }

void set_cloud_fields_after_visible(int64_t visible_version, int64_t version_update_time_ms) {
// Update rowset meta with correct version and visible_ts
// !!ATTENTION!!: this code should be updated if there are more fields
Expand Down
7 changes: 4 additions & 3 deletions be/src/storage/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class StorageEngine final : public BaseStorageEngine {
void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);

void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
int64_t transaction_id, bool is_recover);
int64_t transaction_id, bool is_recover, int64_t commit_tso);
int64_t get_pending_publish_min_version(int64_t tablet_id);

bool add_broken_path(std::string path);
Expand Down Expand Up @@ -583,8 +583,9 @@ class StorageEngine final : public BaseStorageEngine {

std::mutex _cumu_compaction_delay_mtx;

// tablet_id, publish_version, transaction_id, partition_id
std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
// tablet_id, publish_version, transaction_id, partition_id, commit_tso
std::map<int64_t, std::map<int64_t, std::tuple<int64_t, int64_t, int64_t>>>
_async_publish_tasks;
// aync publish for discontinuous versions of merge_on_write table
std::shared_ptr<Thread> _async_publish_thread;
std::shared_mutex _async_publish_lock;
Expand Down
Loading
Loading