diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 2753a527a4b868..306f3c27e2bc9c 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -2035,8 +2035,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& std::set error_tablet_ids; std::map succ_tablets; - // partition_id, tablet_id, publish_version - std::vector> discontinuous_version_tablets; + // partition_id, tablet_id, publish_version, commit_tso + std::vector discontinuous_version_tablets; std::map> table_id_to_tablet_id_to_num_delta_rows; uint32_t retry_time = 0; Status status; @@ -2093,8 +2093,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& } for (auto& item : discontinuous_version_tablets) { - _engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item), - publish_version_req.transaction_id, false); + _engine.add_async_publish_task(item.partition_id, item.tablet_id, item.publish_version, + publish_version_req.transaction_id, false, item.commit_tso); } TFinishTaskRequest finish_task_request; if (!status.ok()) [[unlikely]] { diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp index 157a303c2c6a45..3292b097d729ee 100644 --- a/be/src/cloud/pb_convert.cpp +++ b/be/src/cloud/pb_convert.cpp @@ -113,6 +113,9 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in) if (in.has_job_id()) { out->set_job_id(in.job_id()); } + if (in.has_commit_tso()) { + out->set_commit_tso(in.commit_tso()); + } } void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) { @@ -192,6 +195,9 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) { if (in.has_job_id()) { out->set_job_id(in.job_id()); } + if (in.has_commit_tso()) { + out->set_commit_tso(in.commit_tso()); + } } RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) { @@ -281,6 +287,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) if (in.has_job_id()) { out->set_job_id(in.job_id()); } + if (in.has_commit_tso()) { + out->set_commit_tso(in.commit_tso()); + } } void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) { @@ -359,6 +368,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) { if (in.has_job_id()) { out->set_job_id(in.job_id()); } + if (in.has_commit_tso()) { + out->set_commit_tso(in.commit_tso()); + } } TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) { diff --git a/be/src/information_schema/schema_rowsets_scanner.cpp b/be/src/information_schema/schema_rowsets_scanner.cpp index 20afc85fa79449..179d38ef381ce3 100644 --- a/be/src/information_schema/schema_rowsets_scanner.cpp +++ b/be/src/information_schema/schema_rowsets_scanner.cpp @@ -63,6 +63,7 @@ std::vector SchemaRowsetsScanner::_s_tbls_columns = { {"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true}, {"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true}, {"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true}, + {"COMMIT_TSO", TYPE_BIGINT, sizeof(int64_t), true}, }; @@ -268,6 +269,16 @@ Status SchemaRowsetsScanner::_fill_block_impl(Block* block) { } RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas)); } + // COMMIT_TSO + { + std::vector srcs(fill_rowsets_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + RowsetSharedPtr rowset = rowsets_[i]; + srcs[i - fill_idx_begin] = rowset->commit_tso(); + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas)); + } _rowsets_idx += fill_rowsets_num; return Status::OK(); diff --git a/be/src/service/http/action/pad_rowset_action.cpp b/be/src/service/http/action/pad_rowset_action.cpp index b3c66d91761954..017371af5866ae 100644 --- a/be/src/service/http/action/pad_rowset_action.cpp +++ b/be/src/service/http/action/pad_rowset_action.cpp @@ -110,7 +110,7 @@ Status PadRowsetAction::_pad_rowset(Tablet* tablet, const Version& version) { auto writer = DORIS_TRY(tablet->create_rowset_writer(ctx, false)); RowsetSharedPtr rowset; RETURN_IF_ERROR(writer->build(rowset)); - rowset->make_visible(version); + rowset->make_visible(version, -1); std::vector to_add {rowset}; std::vector to_delete; diff --git a/be/src/storage/data_dir.cpp b/be/src/storage/data_dir.cpp index 9b057259a4ebff..823d7b3cdf02d1 100644 --- a/be/src/storage/data_dir.cpp +++ b/be/src/storage/data_dir.cpp @@ -483,20 +483,21 @@ Status DataDir::load() { } } - auto load_pending_publish_info_func = - [&engine = _engine](int64_t tablet_id, int64_t publish_version, std::string_view info) { - PendingPublishInfoPB pending_publish_info_pb; - bool parsed = pending_publish_info_pb.ParseFromArray(info.data(), - cast_set(info.size())); - if (!parsed) { - LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id - << " publish_version: " << publish_version; - } - engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id, - publish_version, - pending_publish_info_pb.transaction_id(), true); - return true; - }; + auto load_pending_publish_info_func = [&engine = _engine](int64_t tablet_id, + int64_t publish_version, + std::string_view info) { + PendingPublishInfoPB pending_publish_info_pb; + bool parsed = + pending_publish_info_pb.ParseFromArray(info.data(), cast_set(info.size())); + if (!parsed) { + LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id + << " publish_version: " << publish_version; + } + engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id, + publish_version, pending_publish_info_pb.transaction_id(), + true, pending_publish_info_pb.commit_tso()); + return true; + }; MonotonicStopWatch pending_publish_timer; pending_publish_timer.start(); RETURN_IF_ERROR( diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index 498cc8e6c3115d..5a315b6072b082 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -1660,7 +1660,7 @@ void StorageEngine::_follow_cooldown_meta(TabletSharedPtr t) { void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version, int64_t transaction_id, - bool is_recovery) { + bool is_recovery, int64_t commit_tso) { if (!is_recovery) { bool exists = false; { @@ -1685,6 +1685,7 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_ PendingPublishInfoPB pending_publish_info_pb; pending_publish_info_pb.set_partition_id(partition_id); pending_publish_info_pb.set_transaction_id(transaction_id); + pending_publish_info_pb.set_commit_tso(commit_tso); static_cast(TabletMetaManager::save_pending_publish_info( tablet->data_dir(), tablet->tablet_id(), publish_version, pending_publish_info_pb.SerializeAsString())); @@ -1693,7 +1694,7 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_ << " version: " << publish_version << " txn_id:" << transaction_id << " is_recovery: " << is_recovery; std::unique_lock wlock(_async_publish_lock); - _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id}; + _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id, commit_tso}; } int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) { @@ -1730,8 +1731,9 @@ void StorageEngine::_process_async_publish() { auto task_iter = tablet_iter->second.begin(); int64_t version = task_iter->first; - int64_t transaction_id = task_iter->second.first; - int64_t partition_id = task_iter->second.second; + int64_t transaction_id = std::get<0>(task_iter->second); + int64_t partition_id = std::get<1>(task_iter->second); + int64_t commit_tso = std::get<2>(task_iter->second); int64_t max_version = tablet->max_version().second; if (version <= max_version) { @@ -1753,7 +1755,7 @@ void StorageEngine::_process_async_publish() { } auto async_publish_task = std::make_shared( - *this, tablet, partition_id, transaction_id, version); + *this, tablet, partition_id, transaction_id, version, commit_tso); static_cast(_tablet_publish_txn_thread_pool->submit_func( [=]() { async_publish_task->handle(); })); tablet_iter->second.erase(task_iter); diff --git a/be/src/storage/rowset/rowset.cpp b/be/src/storage/rowset/rowset.cpp index 1696b83224901b..165766a199cfd8 100644 --- a/be/src/storage/rowset/rowset.cpp +++ b/be/src/storage/rowset/rowset.cpp @@ -84,7 +84,7 @@ Status Rowset::load(bool use_cache) { return Status::OK(); } -void Rowset::make_visible(Version version) { +void Rowset::make_visible(Version version, int64_t commit_tso) { _is_pending = false; _rowset_meta->set_version(version); _rowset_meta->set_rowset_state(VISIBLE); @@ -95,6 +95,7 @@ void Rowset::make_visible(Version version) { if (_rowset_meta->has_delete_predicate()) { _rowset_meta->mutable_delete_predicate()->set_version(cast_set(version.first)); } + _rowset_meta->set_commit_tso(commit_tso); } void Rowset::set_version(Version version) { diff --git a/be/src/storage/rowset/rowset.h b/be/src/storage/rowset/rowset.h index c37f0d99009880..0618c48612c989 100644 --- a/be/src/storage/rowset/rowset.h +++ b/be/src/storage/rowset/rowset.h @@ -140,7 +140,7 @@ class Rowset : public std::enable_shared_from_this, public MetadataAdder const std::string& tablet_path() const { return _tablet_path; } // publish rowset to make it visible to read - void make_visible(Version version); + void make_visible(Version version, int64_t commit_tso); void set_version(Version version); const TabletSchemaSPtr& tablet_schema() const; @@ -166,6 +166,9 @@ class Rowset : public std::enable_shared_from_this, public MetadataAdder RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); } // The writing time of the newest data in rowset, to measure the freshness of a rowset. int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); } + // The commit tso of the newest data in rowset. + int64_t commit_tso() const { return rowset_meta()->commit_tso(); } + bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } KeysType keys_type() { return _schema->keys_type(); } RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); } diff --git a/be/src/storage/rowset/rowset_meta.h b/be/src/storage/rowset/rowset_meta.h index 83a908613c5c26..0a04e52f5d57f9 100644 --- a/be/src/storage/rowset/rowset_meta.h +++ b/be/src/storage/rowset/rowset_meta.h @@ -460,6 +460,10 @@ class RowsetMeta : public MetadataAdder { [algorithm]() -> Result { return algorithm; }); } + int64_t commit_tso() const { return _rowset_meta_pb.commit_tso(); } + + void set_commit_tso(int64_t commit_tso) { _rowset_meta_pb.set_commit_tso(commit_tso); } + void set_cloud_fields_after_visible(int64_t visible_version, int64_t version_update_time_ms) { // Update rowset meta with correct version and visible_ts // !!ATTENTION!!: this code should be updated if there are more fields diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h index 8b50d1c4d9bf65..35a98967aa9758 100644 --- a/be/src/storage/storage_engine.h +++ b/be/src/storage/storage_engine.h @@ -361,7 +361,7 @@ class StorageEngine final : public BaseStorageEngine { void gc_binlogs(const std::unordered_map& gc_tablet_infos); void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version, - int64_t transaction_id, bool is_recover); + int64_t transaction_id, bool is_recover, int64_t commit_tso); int64_t get_pending_publish_min_version(int64_t tablet_id); bool add_broken_path(std::string path); @@ -583,8 +583,9 @@ class StorageEngine final : public BaseStorageEngine { std::mutex _cumu_compaction_delay_mtx; - // tablet_id, publish_version, transaction_id, partition_id - std::map>> _async_publish_tasks; + // tablet_id, publish_version, transaction_id, partition_id, commit_tso + std::map>> + _async_publish_tasks; // aync publish for discontinuous versions of merge_on_write table std::shared_ptr _async_publish_thread; std::shared_mutex _async_publish_lock; diff --git a/be/src/storage/task/engine_publish_version_task.cpp b/be/src/storage/task/engine_publish_version_task.cpp index ced5ac1314a423..22d78f21f94403 100644 --- a/be/src/storage/task/engine_publish_version_task.cpp +++ b/be/src/storage/task/engine_publish_version_task.cpp @@ -76,7 +76,7 @@ void TabletPublishStatistics::record_in_bvar() { EnginePublishVersionTask::EnginePublishVersionTask( StorageEngine& engine, const TPublishVersionRequest& publish_version_req, std::set* error_tablet_ids, std::map* succ_tablets, - std::vector>* discontinuous_version_tablets, + std::vector* discontinuous_version_tablets, std::map>* table_id_to_tablet_id_to_num_delta_rows) : _engine(engine), _publish_version_req(publish_version_req), @@ -224,15 +224,15 @@ Status EnginePublishVersionTask::execute() { tablet->max_continuous_version_from_beginning(&max_continuous_version); if (max_version > 1 && version.first > max_version && max_continuous_version.second != max_version) { - _handle_publish_version_not_continuous(partition_id, tablet_info, - tablet, version, max_version, - first_time_update, res); + _handle_publish_version_not_continuous( + partition_id, tablet_info, tablet, version, + par_ver_info.commit_tso, max_version, first_time_update, res); continue; } } else { _handle_publish_version_not_continuous(partition_id, tablet_info, tablet, - version, max_version, - first_time_update, res); + version, par_ver_info.commit_tso, + max_version, first_time_update, res); continue; } } @@ -248,14 +248,15 @@ Status EnginePublishVersionTask::execute() { auto tablet_publish_txn_ptr = std::make_shared( _engine, this, tablet, rowset, partition_id, transaction_id, version, - tablet_info); + tablet_info, par_ver_info.commit_tso); tablet_tasks.push_back(tablet_publish_txn_ptr); auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); }); #ifndef NDEBUG LOG(INFO) << "transaction_id: " << transaction_id << ", partition id: " << partition_id << ", version: " << version.second << " start to publish version on tablet: " << tablet_info.tablet_id - << ", submit status: " << submit_st.code(); + << ", submit status: " << submit_st.code() + << ", commit_tso: " << par_ver_info.commit_tso; #endif CHECK(submit_st.ok()) << submit_st; } @@ -327,14 +328,16 @@ Status EnginePublishVersionTask::execute() { void EnginePublishVersionTask::_handle_publish_version_not_continuous( int64_t partition_id, const TabletInfo& tablet_info, const TabletSharedPtr& tablet, - const Version& version, int64_t max_version, bool first_time_update, Status& res) { + const Version& version, const int64_t commit_tso, int64_t max_version, + bool first_time_update, Status& res) { if (config::enable_auto_clone_on_mow_publish_missing_version) { LOG_INFO("mow publish submit missing rowset clone task.") .tag("tablet_id", tablet->tablet_id()) .tag("version", version.second) .tag("replica_id", tablet->replica_id()) .tag("partition_id", tablet->partition_id()) - .tag("table_id", tablet->table_id()); + .tag("table_id", tablet->table_id()) + .tag("commit_tso", commit_tso); Status st = _engine.submit_clone_task(tablet.get(), version.second); if (!st) { LOG_WARNING("mow publish failed to submit missing rowset clone task.") @@ -343,7 +346,8 @@ void EnginePublishVersionTask::_handle_publish_version_not_continuous( .tag("version", version.second) .tag("replica_id", tablet->replica_id()) .tag("partition_id", tablet->partition_id()) - .tag("table_id", tablet->table_id()); + .tag("table_id", tablet->table_id()) + .tag("commit_tso", commit_tso); } } add_error_tablet_id(tablet_info.tablet_id); @@ -351,15 +355,18 @@ void EnginePublishVersionTask::_handle_publish_version_not_continuous( // publish and handle it through async publish. if (max_version + config::mow_publish_max_discontinuous_version_num < version.first) { _engine.add_async_publish_task(partition_id, tablet_info.tablet_id, version.first, - _publish_version_req.transaction_id, false); + _publish_version_req.transaction_id, false, commit_tso); } else { - _discontinuous_version_tablets->emplace_back(partition_id, tablet_info.tablet_id, - version.first); + _discontinuous_version_tablets->emplace_back( + DiscontinuousVersionTablet {.partition_id = partition_id, + .tablet_id = tablet_info.tablet_id, + .publish_version = version.first, + .commit_tso = commit_tso}); } res = Status::Error( "version not continuous for mow, tablet_id={}, " - "tablet_max_version={}, txn_version={}", - tablet_info.tablet_id, max_version, version.first); + "tablet_max_version={}, txn_version={}, commit_tso={}", + tablet_info.tablet_id, max_version, version.first, commit_tso); int64_t missed_version = max_version + 1; int64_t missed_txn_id = _engine.txn_manager()->get_txn_by_tablet_version(tablet->tablet_id(), missed_version); @@ -369,9 +376,9 @@ void EnginePublishVersionTask::_handle_publish_version_not_continuous( auto msg = fmt::format( "uniq key with merge-on-write version not continuous, " "missed version={}, it's transaction_id={}, current publish " - "version={}, tablet_id={}, transaction_id={}", + "version={}, tablet_id={}, transaction_id={}, commit_tso={}", missed_version, missed_txn_id, version.second, tablet->tablet_id(), - _publish_version_req.transaction_id); + _publish_version_req.transaction_id, commit_tso); if (first_time_update) { LOG(INFO) << msg; } else { @@ -401,7 +408,8 @@ TabletPublishTxnTask::TabletPublishTxnTask(StorageEngine& engine, EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id, - Version version, const TabletInfo& tablet_info) + Version version, const TabletInfo& tablet_info, + int64_t commit_tso) : _engine(engine), _engine_publish_version_task(engine_task), _tablet(std::move(tablet)), @@ -414,7 +422,8 @@ TabletPublishTxnTask::TabletPublishTxnTask(StorageEngine& engine, MemTrackerLimiter::Type::OTHER, fmt::format("TabletPublishTxnTask-partitionID_{}-transactionID_{}-version_{}", std::to_string(partition_id), std::to_string(transaction_id), - version.to_string()))) { + version.to_string()))), + _commit_tso(commit_tso) { _stats.submit_time_us = MonotonicMicros(); } @@ -424,18 +433,19 @@ Status publish_version_and_add_rowset(StorageEngine& engine, int64_t partition_i const TabletSharedPtr& tablet, const RowsetSharedPtr& rowset, int64_t transaction_id, const Version& version, EnginePublishVersionTask* engine_publish_version_task, - TabletPublishStatistics& stats) { + TabletPublishStatistics& stats, int64_t commit_tso) { // ATTN: Here, the life cycle needs to be extended to prevent tablet_txn_info.pending_rs_guard in txn // from being released prematurely, causing path gc to mistakenly delete the dat file std::shared_ptr extend_tablet_txn_info_lifetime = nullptr; // Publish the transaction - auto result = engine.txn_manager()->publish_txn(partition_id, tablet, transaction_id, version, - &stats, extend_tablet_txn_info_lifetime); + auto result = + engine.txn_manager()->publish_txn(partition_id, tablet, transaction_id, version, &stats, + extend_tablet_txn_info_lifetime, commit_tso); if (!result.ok()) { LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id() << ", tablet_id=" << tablet->tablet_id() << ", txn_id=" << transaction_id - << ", res=" << result; + << ", commit_tso=" << commit_tso << ", res=" << result; if (engine_publish_version_task) { engine_publish_version_task->add_error_tablet_id(tablet->tablet_id()); } @@ -480,7 +490,7 @@ void TabletPublishTxnTask::handle() { _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us; _result = publish_version_and_add_rowset(_engine, _partition_id, _tablet, _rowset, _transaction_id, _version, - _engine_publish_version_task, _stats); + _engine_publish_version_task, _stats, _commit_tso); if (!_result.ok()) { return; @@ -519,8 +529,9 @@ void AsyncTabletPublishTask::handle() { RowsetSharedPtr rowset = iter->second; Version version(_version, _version); - auto publish_status = publish_version_and_add_rowset(_engine, _partition_id, _tablet, rowset, - _transaction_id, version, nullptr, _stats); + auto publish_status = + publish_version_and_add_rowset(_engine, _partition_id, _tablet, rowset, _transaction_id, + version, nullptr, _stats, _commit_tso); if (!publish_status.ok()) { return; diff --git a/be/src/storage/task/engine_publish_version_task.h b/be/src/storage/task/engine_publish_version_task.h index 4d9cd08ca3bbd2..ba1f70588ae481 100644 --- a/be/src/storage/task/engine_publish_version_task.h +++ b/be/src/storage/task/engine_publish_version_task.h @@ -65,7 +65,8 @@ class TabletPublishTxnTask { public: TabletPublishTxnTask(StorageEngine& engine, EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t partition_id, - int64_t transaction_id, Version version, const TabletInfo& tablet_info); + int64_t transaction_id, Version version, const TabletInfo& tablet_info, + int64_t commit_tso); ~TabletPublishTxnTask(); void handle(); @@ -84,16 +85,25 @@ class TabletPublishTxnTask { TabletPublishStatistics _stats; Status _result; std::shared_ptr _mem_tracker; + int64_t _commit_tso; +}; + +struct DiscontinuousVersionTablet { + int64_t partition_id; + int64_t tablet_id; + int64_t publish_version; + int64_t commit_tso; }; class EnginePublishVersionTask final : public EngineTask { public: - EnginePublishVersionTask( - StorageEngine& engine, const TPublishVersionRequest& publish_version_req, - std::set* error_tablet_ids, std::map* succ_tablets, - std::vector>* discontinous_version_tablets, - std::map>* - table_id_to_tablet_id_to_num_delta_rows); + EnginePublishVersionTask(StorageEngine& engine, + const TPublishVersionRequest& publish_version_req, + std::set* error_tablet_ids, + std::map* succ_tablets, + std::vector* discontinous_version_tablets, + std::map>* + table_id_to_tablet_id_to_num_delta_rows); ~EnginePublishVersionTask() override = default; Status execute() override; @@ -103,8 +113,9 @@ class EnginePublishVersionTask final : public EngineTask { private: void _handle_publish_version_not_continuous(int64_t partition_id, const TabletInfo& tablet_info, const TabletSharedPtr& tablet, - const Version& version, int64_t max_version, - bool first_time_update, Status& res); + const Version& version, const int64_t commit_tso, + int64_t max_version, bool first_time_update, + Status& res); void _calculate_tbl_num_delta_rows( const std::unordered_map& tablet_id_to_num_delta_rows); @@ -113,7 +124,7 @@ class EnginePublishVersionTask final : public EngineTask { std::mutex _tablet_ids_mutex; std::set* _error_tablet_ids = nullptr; std::map* _succ_tablets; - std::vector>* _discontinuous_version_tablets = nullptr; + std::vector* _discontinuous_version_tablets = nullptr; std::map>* _table_id_to_tablet_id_to_num_delta_rows = nullptr; }; @@ -121,14 +132,15 @@ class EnginePublishVersionTask final : public EngineTask { class AsyncTabletPublishTask { public: AsyncTabletPublishTask(StorageEngine& engine, TabletSharedPtr tablet, int64_t partition_id, - int64_t transaction_id, int64_t version) + int64_t transaction_id, int64_t version, int64_t commit_tso) : _engine(engine), _tablet(std::move(tablet)), _partition_id(partition_id), _transaction_id(transaction_id), _version(version), _mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, - "AsyncTabletPublishTask")) { + "AsyncTabletPublishTask")), + _commit_tso(commit_tso) { _stats.submit_time_us = MonotonicMicros(); } ~AsyncTabletPublishTask() = default; @@ -143,6 +155,7 @@ class AsyncTabletPublishTask { int64_t _version; TabletPublishStatistics _stats; std::shared_ptr _mem_tracker; + int64_t _commit_tso; }; } // namespace doris diff --git a/be/src/storage/txn/txn_manager.cpp b/be/src/storage/txn/txn_manager.cpp index f9320204b78d0f..f42dbcf5de7609 100644 --- a/be/src/storage/txn/txn_manager.cpp +++ b/be/src/storage/txn/txn_manager.cpp @@ -200,10 +200,11 @@ Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet, Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, const Version& version, TabletPublishStatistics* stats, - std::shared_ptr& extend_tablet_txn_info) { + std::shared_ptr& extend_tablet_txn_info, + const int64_t commit_tso) { return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, tablet->tablet_id(), tablet->tablet_uid(), version, stats, - extend_tablet_txn_info); + extend_tablet_txn_info, commit_tso); } void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id, @@ -459,7 +460,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const Version& version, TabletPublishStatistics* stats, - std::shared_ptr& extend_tablet_txn_info) { + std::shared_ptr& extend_tablet_txn_info, + const int64_t commit_tso) { auto tablet = _engine.tablet_manager()->get_tablet(tablet_id); if (tablet == nullptr) { return Status::OK(); @@ -493,8 +495,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, if (rowset == nullptr) { return Status::Error( "publish txn failed, rowset not found. partition_id={}, transaction_id={}, " - "tablet={}", - partition_id, transaction_id, tablet_info.to_string()); + "tablet={}, commit_tso={}", + partition_id, transaction_id, tablet_info.to_string(), commit_tso); } DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { @@ -519,7 +521,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, // it is under a single txn lock // TODO(ygl): rowset is already set version here, memory is changed, if save failed // it maybe a fatal error - rowset->make_visible(version); + rowset->make_visible(version, commit_tso); DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { diff --git a/be/src/storage/txn/txn_manager.h b/be/src/storage/txn/txn_manager.h index 684427022ef06f..c7d9fc23796869 100644 --- a/be/src/storage/txn/txn_manager.h +++ b/be/src/storage/txn/txn_manager.h @@ -173,7 +173,8 @@ class TxnManager { Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, const Version& version, TabletPublishStatistics* stats, - std::shared_ptr& extend_tablet_txn_info); + std::shared_ptr& extend_tablet_txn_info, + const int64_t commit_tso = -1); // delete the txn from manager if it is not committed(not have a valid rowset) Status rollback_txn(TPartitionId partition_id, const Tablet& tablet, @@ -192,7 +193,8 @@ class TxnManager { Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const Version& version, TabletPublishStatistics* stats, - std::shared_ptr& extend_tablet_txn_info); + std::shared_ptr& extend_tablet_txn_info, + const int64_t commit_tso = -1); // only abort not committed txn void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, diff --git a/be/test/storage/storage_engine_test.cpp b/be/test/storage/storage_engine_test.cpp index 8d008808515d96..d9e25b726cdf42 100644 --- a/be/test/storage/storage_engine_test.cpp +++ b/be/test/storage/storage_engine_test.cpp @@ -17,6 +17,7 @@ #include "storage/storage_engine.h" +#include #include #include #include @@ -25,12 +26,15 @@ #include #include +#include +#include #include "common/status.h" #include "gtest/gtest_pred_impl.h" #include "io/fs/local_file_system.h" #include "storage/data_dir.h" #include "storage/tablet/tablet_manager.h" +#include "storage/tablet/tablet_meta_manager.h" #include "util/threadpool.h" namespace doris { @@ -139,10 +143,28 @@ TEST_F(StorageEngineTest, TestAsyncPublish) { EXPECT_EQ(tablet->max_version().second, 10); for (int64_t i = 5; i < 12; ++i) { - _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false, i * 10); } EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7); EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 5); + + std::unordered_map version_to_commit_tso; + st = TabletMetaManager::traverse_pending_publish( + _data_dir->get_meta(), + [&](int64_t traversed_tablet_id, int64_t publish_version, std::string_view info) { + if (traversed_tablet_id != tablet_id) { + return true; + } + PendingPublishInfoPB pb; + bool parsed = pb.ParseFromArray(info.data(), static_cast(info.size())); + EXPECT_TRUE(parsed); + version_to_commit_tso[publish_version] = pb.commit_tso(); + return true; + }); + EXPECT_TRUE(st.ok()) << st; + EXPECT_EQ(version_to_commit_tso[5], 50); + EXPECT_EQ(version_to_commit_tso[11], 110); + for (int64_t i = 1; i < 8; ++i) { _storage_engine->_process_async_publish(); EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7 - i); @@ -151,13 +173,13 @@ TEST_F(StorageEngineTest, TestAsyncPublish) { EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0); for (int64_t i = 100; i < config::max_tablet_version_num + 120; ++i) { - _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false, -1 /*tso*/); } EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), config::max_tablet_version_num + 20); for (int64_t i = 90; i < 120; ++i) { - _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false, -1 /*tso*/); } EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), config::max_tablet_version_num + 30); diff --git a/be/test/storage/txn/txn_manager_test.cpp b/be/test/storage/txn/txn_manager_test.cpp index 1e717b32629455..7a39eebb0c6b6f 100644 --- a/be/test/storage/txn/txn_manager_test.cpp +++ b/be/test/storage/txn/txn_manager_test.cpp @@ -344,6 +344,32 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) { EXPECT_EQ(rowset_meta->end_version(), 11); } +TEST_F(TxnManagerTest, PublishVersionWithCommitTSO) { + auto guard = k_engine->pending_local_rowsets().add(_rowset->rowset_id()); + auto st = k_engine->txn_manager()->commit_txn(_meta.get(), partition_id, transaction_id, + tablet_id, _tablet_uid, load_id, _rowset, + std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; + Version new_version(10, 11); + TabletPublishStatistics stats; + int64_t commit_tso = 123456; + { + std::shared_ptr extend_tablet_txn_info_lifetime = nullptr; + st = k_engine->txn_manager()->publish_txn(_meta.get(), partition_id, transaction_id, + tablet_id, _tablet_uid, new_version, &stats, + extend_tablet_txn_info_lifetime, commit_tso); + ASSERT_TRUE(st.ok()) << st; + } + + RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); + st = RowsetMetaManager::get_rowset_meta(_meta.get(), _tablet_uid, _rowset->rowset_id(), + rowset_meta); + ASSERT_TRUE(st.ok()) << st; + EXPECT_EQ(rowset_meta->start_version(), 10); + EXPECT_EQ(rowset_meta->end_version(), 11); + EXPECT_EQ(rowset_meta->commit_tso(), commit_tso); +} + // 1. publish version failed if not found related txn and rowset TEST_F(TxnManagerTest, PublishNotExistedTxn) { Version new_version(10, 11); @@ -451,7 +477,7 @@ TEST_F(TxnManagerTest, DeleteCommittedTxnCleanupOnError) { // make rowset visible Version version(0, 1); - _rowset->make_visible(version); + _rowset->make_visible(version, -1); // now try to delete the transaction // this should return TRANSACTION_ALREADY_COMMITTED but still clean up the transaction state diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 110f0bbe3bd133..ddf2cd1f18b1f3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3361,6 +3361,50 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, masterOnly = true) public static long mow_get_ms_lock_retry_backoff_interval = 80; + @ConfField(mutable = true, masterOnly = true, description = { + "Whether to enable TSO."}, varType = VariableAnnotation.EXPERIMENTAL) + public static boolean enable_tso_feature = false; + + @ConfField(mutable = false, masterOnly = true, description = { + "TSO service update interval in milliseconds. Default is 50, which means the TSO service " + + "will perform timestamp update checks every 50 milliseconds."}) + public static int tso_service_update_interval_ms = 50; + + @ConfField(mutable = true, masterOnly = true, description = { + "TSO service max retry count. Default is 3, which means the TSO service will retry 3 times" + + "to update the global timestamp."}) + public static int tso_max_update_retry_count = 3; + + @ConfField(mutable = true, masterOnly = true, description = { + "TSO get max retry count. Default is 10, which means the TSO service will retry 10 times" + + "to generate TSO."}) + public static int tso_max_get_retry_count = 10; + + @ConfField(mutable = true, masterOnly = true, description = { + "TSO service time window in milliseconds. Default is 5000, which means the TSO service" + + "will apply for a TSO time window of 5000ms from BDBJE once."}) + public static int tso_service_window_duration_ms = 5000; + + @ConfField(mutable = true, masterOnly = true, description = { + "Max tolerated clock backward threshold during TSO calibration in milliseconds. " + + "Exceeding this threshold will fail enabling TSO. Default is 30 minutes."}) + public static long tso_clock_backward_startup_threshold_ms = 30L * 60 * 1000; + + @ConfField(mutable = true, description = { + "TSO service time offset in milliseconds. Only for test. Default is 0, which means the TSO service" + + "timestamp offset is 0 milliseconds."}) + public static int tso_time_offset_debug_mode = 0; + + @ConfField(mutable = true, masterOnly = true, description = { + "Whether to enable persisting TSO window end into edit log. Enabling emits new op code," + + " which may break rollback to older versions."}) + public static boolean enable_tso_persist_journal = false; + + @ConfField(mutable = true, masterOnly = true, description = { + "Whether to include TSO info as an image module in checkpoint. Older versions may need to ignore" + + " unknown modules when reading new images."}) + public static boolean enable_tso_checkpoint_module = false; + @ConfField(mutable = true, masterOnly = true) public static boolean enable_notify_be_after_load_txn_commit = false; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index 746ca81f6f1dc3..bc7a7535a930f1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -102,13 +102,15 @@ public final class FeMetaVersion { public static final int VERSION_139 = 139; public static final int VERSION_140 = 140; + // For tso + public static final int VERSION_141 = 141; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_140; + public static final int VERSION_CURRENT = VERSION_141; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... // these clause will be useless and we could remove them - public static final int MINIMUM_VERSION_REQUIRED = VERSION_140; + public static final int MINIMUM_VERSION_REQUIRED = VERSION_141; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 55ebf57ee78144..259a37f25d3a9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2574,6 +2574,7 @@ public void updateTableProperties(Database db, String tableName, Map> configtoThreads = ImmutableMap .of("dynamic_partition_check_interval_seconds", this::getDynamicPartitionScheduler); + private TSOService tsoService; + public List getFrontendInfos() { List res = new ArrayList<>(); @@ -858,6 +861,7 @@ public Env(boolean isCheckpointCatalog) { if (Config.agent_task_health_check_intervals_ms > 0) { this.agentTaskCleanupDaemon = new AgentTaskCleanupDaemon(); } + this.tsoService = new TSOService(); } public static Map getSessionReportTimeMap() { @@ -2003,6 +2007,7 @@ protected void startMasterOnlyDaemonThreads() { keyManager.init(); } agentTaskCleanupDaemon.start(); + tsoService.start(); } // start threads that should run on all FE @@ -2886,6 +2891,16 @@ public long saveKeyManagerStore(CountingDataOutputStream out, long checksum) thr return checksum; } + // Persist TSO-related info into image for fast recovery + public long saveTSO(CountingDataOutputStream dos, long checksum) throws IOException { + return tsoService.saveTSO(dos, checksum); + } + + // Load TSO-related info from image during checkpoint load + public long loadTSO(DataInputStream dis, long checksum) throws IOException { + return tsoService.loadTSO(dis, checksum); + } + public long saveConstraintManager(CountingDataOutputStream out, long checksum) throws IOException { constraintManager.write(out); LOG.info("finished save ConstraintManager to image"); @@ -6230,6 +6245,7 @@ public void modifyTableProperties(Database db, OlapTable table, Map()); + } + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ENABLE_TSO, + Boolean.valueOf(enableTso).toString()); + tableProperty.buildEnableTso(); + } + + public Boolean enableTso() { + if (tableProperty != null) { + return tableProperty.enableTso(); + } + return false; + } + public void setStoreRowColumn(boolean storeRowColumn) { TableProperty tableProperty = getOrCreatTableProperty(); tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 6f269113012918..dd39a0de58286b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -426,6 +426,7 @@ public class SchemaTable extends Table { .column("CREATION_TIME", ScalarType.createType(PrimitiveType.DATETIME)) .column("NEWEST_WRITE_TIMESTAMP", ScalarType.createType(PrimitiveType.DATETIME)) .column("SCHEMA_VERSION", ScalarType.createType(PrimitiveType.INT)) + .column("COMMIT_TSO", ScalarType.createType(PrimitiveType.BIGINT)) .build())) .put("parameters", new SchemaTable(SystemIdGenerator.getNextId(), "parameters", TableType.SCHEMA, builder().column("SPECIFIC_CATALOG", ScalarType.createVarchar(64)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 7ada8286d9878d..2221a8f8fb7340 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -98,6 +98,8 @@ public class TableProperty implements GsonPostProcessable { private boolean enableSingleReplicaCompaction = false; + private boolean enableTso = false; + private int verticalCompactionNumColumnsPerGroup = 5; private boolean storeRowColumn = false; @@ -169,6 +171,7 @@ public TableProperty buildProperty(short opCode) { buildTimeSeriesCompactionTimeThresholdSeconds(); buildSkipWriteIndexOnLoad(); buildEnableSingleReplicaCompaction(); + buildEnableTso(); buildVerticalCompactionNumColumnsPerGroup(); buildDisableAutoCompaction(); buildTimeSeriesCompactionEmptyRowsetsThreshold(); @@ -357,6 +360,15 @@ public boolean enableSingleReplicaCompaction() { return enableSingleReplicaCompaction; } + public TableProperty buildEnableTso() { + enableTso = Boolean.parseBoolean(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_TSO, "false")); + return this; + } + + public boolean enableTso() { + return enableTso; + } + public TableProperty buildVerticalCompactionNumColumnsPerGroup() { verticalCompactionNumColumnsPerGroup = Integer.parseInt( properties.getOrDefault(PropertyAnalyzer.PROPERTIES_VERTICAL_COMPACTION_NUM_COLUMNS_PER_GROUP, "5")); @@ -918,6 +930,7 @@ public void gsonPostProcess() throws IOException { buildTimeSeriesCompactionTimeThresholdSeconds(); buildDisableAutoCompaction(); buildEnableSingleReplicaCompaction(); + buildEnableTso(); buildVerticalCompactionNumColumnsPerGroup(); buildTimeSeriesCompactionEmptyRowsetsThreshold(); buildTimeSeriesCompactionLevelThreshold(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java index c2155963273e09..c4b4c45ab2b476 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java @@ -112,6 +112,7 @@ public void updateTableProperties(Database db, String tableName, Map p + " must be `true` or `false`"); } + public static Boolean analyzeEnableTso(Map properties) throws AnalysisException { + if (properties == null || properties.isEmpty()) { + return false; + } + String value = properties.get(PROPERTIES_ENABLE_TSO); + if (null == value) { + return false; + } + properties.remove(PROPERTIES_ENABLE_TSO); + if (value.equalsIgnoreCase("true")) { + if (!Config.enable_tso_feature) { + throw new AnalysisException(PROPERTIES_ENABLE_TSO + + " can not be enabled when experimental_enable_tso_feature is disabled"); + } + return true; + } else if (value.equalsIgnoreCase("false")) { + return false; + } + throw new AnalysisException(PROPERTIES_ENABLE_TSO + " must be `true` or `false`"); + } + public static Boolean analyzeEnableDuplicateWithoutKeysByDefault(Map properties) throws AnalysisException { if (properties == null || properties.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index cbbc3dd1797242..d23d12d0e8f5c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1537,6 +1537,10 @@ public void addPartition(Database db, String tableName, AddPartitionOp addPartit properties.put(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION, olapTable.enableSingleReplicaCompaction().toString()); } + if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_TSO)) { + properties.put(PropertyAnalyzer.PROPERTIES_ENABLE_TSO, + olapTable.enableTso().toString()); + } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN)) { properties.put(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN, olapTable.storeRowColumn().toString()); @@ -2627,6 +2631,14 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th } olapTable.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction); + boolean enableTso = false; + try { + enableTso = PropertyAnalyzer.analyzeEnableTso(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setEnableTso(enableTso); + if (Config.isCloudMode() && ((CloudEnv) env).getEnableStorageVault()) { // Pair storageVaultInfoPair = PropertyAnalyzer.analyzeStorageVault(properties, db); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TSOAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TSOAction.java new file mode 100644 index 00000000000000..ccaac415ac3529 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TSOAction.java @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Env; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.tso.TSOTimestamp; + +import com.google.common.collect.Maps; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +/** + * TSOAction is used to get TSO (Timestamp Oracle) information via HTTP interface. + * This interface only allows retrieving TSO information without increasing the TSO value. + * + * Example usage: + * GET /api/tso + * Response: + * { + * "window_end_physical_time": 1625097600000, + * "current_tso": 123456789012345678, + * "current_tso_physical_time": 1625097600000, + * "current_tso_logical_counter": 123 + * } + */ +@RestController +public class TSOAction extends RestBaseController { + + private static final Logger LOG = LogManager.getLogger(TSOAction.class); + + /** + * Get current TSO information. + * This interface only returns TSO information without increasing the TSO value. + * + * @param request HTTP request + * @param response HTTP response + * @return ResponseEntity with TSO information including: + * - window_end_physical_time: The end time of the current TSO window + * - current_tso: The current composed TSO timestamp + * - current_tso_physical_time: The physical time part of current TSO + * - current_tso_logical_counter: The logical counter part of current TSO + */ + @RequestMapping(path = "/api/tso", method = RequestMethod.GET) + public Object getTSO(HttpServletRequest request, HttpServletResponse response) { + try { + //check user auth + executeCheckPassword(request, response); + + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady()) { + LOG.warn("TSO HTTP API: FE is not ready"); + return ResponseEntityBuilder.badRequest("FE is not ready"); + } + if (!env.isMaster()) { + LOG.info("TSO HTTP API: current FE is not master, forward to master {}:{}", + env.getMasterHost(), env.getMasterHttpPort()); + return forwardToMasterAndAddMasterAddress(request, env); + } + // Get current TSO information without increasing it + long windowEndPhysicalTime = env.getTSOService().getWindowEndTSO(); + long currentTSO = env.getTSOService().getCurrentTSO(); + + // Prepare response data with detailed TSO information + Map result = Maps.newHashMap(); + result.put("window_end_physical_time", windowEndPhysicalTime); + result.put("current_tso", currentTSO); + result.put("current_tso_physical_time", TSOTimestamp.extractPhysicalTime(currentTSO)); + result.put("current_tso_logical_counter", TSOTimestamp.extractLogicalCounter(currentTSO)); + return ResponseEntityBuilder.ok(result); + } catch (Exception e) { + LOG.warn("Failed to get TSO information", e); + return ResponseEntityBuilder.badRequest(e.getMessage()); + } + } + + @SuppressWarnings("unchecked") + private Object forwardToMasterAndAddMasterAddress(HttpServletRequest request, Env env) { + Object forwarded = forwardToMaster(request); + String masterFeAddr = env.getMasterHost() + ":" + env.getMasterHttpPort(); + if (forwarded instanceof ResponseEntity) { + return forwarded; + } + if (forwarded instanceof Map) { + ((Map) forwarded).put("master_fe_addr", masterFeAddr); + return forwarded; + } + Map wrapped = Maps.newHashMap(); + wrapped.put("result", forwarded); + wrapped.put("master_fe_addr", masterFeAddr); + return ResponseEntityBuilder.ok(wrapped); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 0f1712110f1296..505289e7324c76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -143,6 +143,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.tso.TSOTimestamp; import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; @@ -1009,6 +1010,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_TSO_TIMESTAMP_WINDOW_END: { + data = TSOTimestamp.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java index f7028b008f0276..7aa1021762ce37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java @@ -220,4 +220,12 @@ public List getDatabaseNames() { public boolean exceedMaxJournalSize(short op, Writable writable) throws IOException { return false; } + + public boolean isReadyToFlush() { + if (outputStream == null) { + return false; + } else { + return true; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index c28c2aeb99f207..35adf866121ac7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -265,6 +265,14 @@ public final class MetricRepo { public static AutoMappedMetric COUNTER_AGENT_TASK_TOTAL; public static AutoMappedMetric COUNTER_AGENT_TASK_RESEND_TOTAL; + // TSO + public static LongCounterMetric COUNTER_TSO_CLOCK_DRIFT_DETECTED; + public static LongCounterMetric COUNTER_TSO_CLOCK_BACKWARD_DETECTED; + public static LongCounterMetric COUNTER_TSO_CLOCK_CALCULATED; + public static LongCounterMetric COUNTER_TSO_CLOCK_UPDATED; + public static LongCounterMetric COUNTER_TSO_CLOCK_UPDATE_FAILED; + public static LongCounterMetric COUNTER_TSO_CLOCK_GET_SUCCESS; + private static Map, Long> loadJobNum = Maps.newHashMap(); private static final ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, @@ -1062,6 +1070,26 @@ public Integer getValue() { COUNTER_AGENT_TASK_RESEND_TOTAL = addLabeledMetrics("task", () -> new LongCounterMetric("agent_task_resend_total", MetricUnit.NOUNIT, "total agent task resend")); + // TSO + COUNTER_TSO_CLOCK_DRIFT_DETECTED = new LongCounterMetric("tso_clock_drift_detected", MetricUnit.NOUNIT, + "counter of tso clock drift detected"); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_TSO_CLOCK_DRIFT_DETECTED); + COUNTER_TSO_CLOCK_BACKWARD_DETECTED = new LongCounterMetric("tso_clock_backward_detected", MetricUnit.NOUNIT, + "counter of tso clock backward detected"); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_TSO_CLOCK_BACKWARD_DETECTED); + COUNTER_TSO_CLOCK_CALCULATED = new LongCounterMetric("tso_clock_calculated", MetricUnit.NOUNIT, + "counter of tso clock calculated"); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_TSO_CLOCK_CALCULATED); + COUNTER_TSO_CLOCK_UPDATED = new LongCounterMetric("tso_clock_updated", MetricUnit.NOUNIT, + "counter of tso clock updated"); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_TSO_CLOCK_UPDATED); + COUNTER_TSO_CLOCK_UPDATE_FAILED = new LongCounterMetric("tso_clock_update_failed", MetricUnit.NOUNIT, + "counter of tso clock update failed"); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_TSO_CLOCK_UPDATE_FAILED); + COUNTER_TSO_CLOCK_GET_SUCCESS = new LongCounterMetric("tso_clock_get_success", MetricUnit.NOUNIT, + "counter of tso clock get success"); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_TSO_CLOCK_GET_SUCCESS); + // init system metrics initSystemMetrics(); CloudMetrics.init(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java index 8ae93531a0f5a0..ea7437b2127213 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableProperty; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DatasourcePrintableMap; import org.apache.doris.common.util.DynamicPartitionUtil; @@ -295,6 +296,22 @@ public void validate(ConnectContext ctx) throws UserException { } this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_TSO)) { + if (!properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_TSO).equalsIgnoreCase("true") + && !properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_TSO).equalsIgnoreCase("false")) { + throw new AnalysisException( + "Property " + + PropertyAnalyzer.PROPERTIES_ENABLE_TSO + + " should be set to true or false"); + } + if (properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_TSO).equalsIgnoreCase("true") + && !Config.enable_tso_feature) { + throw new AnalysisException( + "Property " + PropertyAnalyzer.PROPERTIES_ENABLE_TSO + + " can not be enabled when experimental_enable_tso_feature is disabled"); + } + this.needTableStable = false; + this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE)) { if (!properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE) .equalsIgnoreCase("true") diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index ab36a7d5a3b041..19043a60aea2e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -106,6 +106,7 @@ import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.tso.TSOTimestamp; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; @@ -1467,6 +1468,10 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { // This log is only used to keep FE/MS cut point in journal timeline. break; } + case OperationType.OP_TSO_TIMESTAMP_WINDOW_END: { + env.getCurrentTSOService().replayWindowEndTSO((TSOTimestamp) journal.getData()); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}, log id: {}", opCode, logId, e); @@ -1910,6 +1915,10 @@ public void logTimestamp(Timestamp stamp) { logEdit(OperationType.OP_TIMESTAMP, stamp); } + public void logTSOTimestampWindowEnd(TSOTimestamp windowEnd) { + logEdit(OperationType.OP_TSO_TIMESTAMP_WINDOW_END, windowEnd); + } + public void logMasterInfo(MasterInfo info) { logEdit(OperationType.OP_MASTER_INFO_CHANGE, info); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 96bb4669e41763..6e5dad9348b4f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -431,6 +431,8 @@ public class OperationType { public static final short OP_BEGIN_SNAPSHOT = 1100; public static final short OP_META_SYNC_POINT = 1101; + public static final short OP_TSO_TIMESTAMP_WINDOW_END = 1200; + /** * Get opcode name by op code. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java index 995b09d50dc4d5..5e4b10cb48cb9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -287,6 +287,12 @@ public static MetaPersistMethod create(String name) throws NoSuchMethodException metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveConstraintManager", CountingDataOutputStream.class, long.class); break; + case "tso": + metaPersistMethod.readMethod = Env.class.getDeclaredMethod("loadTSO", DataInputStream.class, + long.class); + metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveTSO", + CountingDataOutputStream.class, long.class); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java index 6f5734aaa46878..72530bea9ef245 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -44,7 +44,7 @@ public class PersistMetaModules { "globalFunction", "workloadGroups", "binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "workloadSchedPolicy", "insertOverwrite", "plsql", "dictionaryManager", "indexPolicy", "KeyManagerStore", - "constraintManager" + "constraintManager", "tso" // TODO: Re-enable "authenticationIntegrations" after persistence requirements are confirmed. // , "authenticationIntegrations" ); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index b0a17790170791..0b03eaf6c44a0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1578,7 +1578,7 @@ private PartitionCommitInfo generatePartitionCommitInfo(OlapTable table, long pa protected void unprotectedCommitTransaction(TransactionState transactionState, Set errorReplicaIds, Map> tableToPartition, Set totalInvolvedBackends, - Database db) { + Database db) throws TransactionCommitFailedException { // transaction state is modified during check if the transaction could committed if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) { return; @@ -1586,6 +1586,9 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S // update transaction state version long commitTime = System.currentTimeMillis(); transactionState.setCommitTime(commitTime); + long commitTSO = getCommitTSO(transactionState, db, tableToPartition.keySet()); + transactionState.setCommitTSO(commitTSO); + if (MetricRepo.isInit) { MetricRepo.HISTO_TXN_EXEC_LATENCY.update(commitTime - transactionState.getPrepareTime()); } @@ -1593,6 +1596,10 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S for (long tableId : tableToPartition.keySet()) { OlapTable table = (OlapTable) db.getTableNullable(tableId); TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); + if (Config.enable_tso_feature && table.enableTso()) { + tableCommitInfo.setCommitTSO(commitTSO); + } + for (long partitionId : tableToPartition.get(tableId)) { Partition partition = table.getPartition(partitionId); tableCommitInfo.addPartitionCommitInfo( @@ -1611,7 +1618,7 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S protected void unprotectedCommitTransaction(TransactionState transactionState, Set errorReplicaIds, Map> subTxnToPartition, Set totalInvolvedBackends, - List subTransactionStates, Database db) { + List subTransactionStates, Database db) throws TransactionCommitFailedException { // transaction state is modified during check if the transaction could committed if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) { return; @@ -1619,6 +1626,14 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S // update transaction state version long commitTime = System.currentTimeMillis(); transactionState.setCommitTime(commitTime); + Set tableIds = new HashSet<>(); + for (SubTransactionState subTransactionState : subTransactionStates) { + long tableId = subTransactionState.getTable().getId(); + tableIds.add(tableId); + } + long commitTSO = getCommitTSO(transactionState, db, tableIds); + transactionState.setCommitTSO(commitTSO); + if (MetricRepo.isInit) { MetricRepo.HISTO_TXN_EXEC_LATENCY.update(commitTime - transactionState.getPrepareTime()); } @@ -1646,6 +1661,9 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); tableCommitInfo.setVersion(tableNextVersion); tableCommitInfo.setVersionTime(System.currentTimeMillis()); + if (Config.enable_tso_feature && table.enableTso()) { + tableCommitInfo.setCommitTSO(commitTSO); + } for (long partitionId : partitionIds) { long partitionNextVersion = table.getPartition(partitionId).getNextVersion(); @@ -1673,7 +1691,8 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S transactionState.setInvolvedBackends(totalInvolvedBackends); } - protected void unprotectedCommitTransaction2PC(TransactionState transactionState, Database db) { + protected void unprotectedCommitTransaction2PC(TransactionState transactionState, Database db) + throws TransactionCommitFailedException { // transaction state is modified during check if the transaction could committed if (transactionState.getTransactionStatus() != TransactionStatus.PRECOMMITTED) { LOG.warn("Unknown exception. state of transaction [{}] changed, failed to commit transaction", @@ -1682,6 +1701,9 @@ protected void unprotectedCommitTransaction2PC(TransactionState transactionState } // update transaction state version transactionState.setCommitTime(System.currentTimeMillis()); + long commitTSO = getCommitTSO(transactionState, db, transactionState.getIdToTableCommitInfos().keySet()); + transactionState.setCommitTSO(commitTSO); + transactionState.setTransactionStatus(TransactionStatus.COMMITTED); Iterator tableCommitInfoIterator @@ -1698,6 +1720,9 @@ protected void unprotectedCommitTransaction2PC(TransactionState transactionState transactionState); continue; } + if (Config.enable_tso_feature && table.enableTso()) { + tableCommitInfo.setCommitTSO(commitTSO); + } Iterator partitionCommitInfoIterator = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator(); while (partitionCommitInfoIterator.hasNext()) { @@ -3050,4 +3075,43 @@ private void cleanSubTransactions(long transactionId) { } } } + + private long getCommitTSO(TransactionState transactionState, Database db, Set tableIds) + throws TransactionCommitFailedException { + long tso = -1L; + if (!Config.enable_tso_feature) { + return tso; + } + if (tableIds == null || tableIds.isEmpty()) { + return tso; + } + boolean anyEnableTso = false; + for (long tableId : tableIds) { + Table table = db.getTableNullable(tableId); + if (table instanceof OlapTable && ((OlapTable) table).enableTso()) { + anyEnableTso = true; + break; + } + } + if (!anyEnableTso) { + return tso; + } + try { + Env env = Env.getCurrentEnv(); + if (env == null || env.getTSOService() == null) { + throw new TransactionCommitFailedException("failed to get TSO for txn " + + transactionState.getTransactionId() + ": TSO service is unavailable"); + } + long fetched = env.getTSOService().getTSO(); + if (fetched <= 0) { + throw new TransactionCommitFailedException("failed to get TSO for txn " + + transactionState.getTransactionId() + ", fetched=" + fetched); + } + return fetched; + } catch (RuntimeException e) { + LOG.warn("failed to get TSO for txn {}, abort commit", transactionState.getTransactionId(), e); + throw new TransactionCommitFailedException("failed to get TSO for txn " + + transactionState.getTransactionId(), e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java index 412ae065f3e2dd..3d0d13490646d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java @@ -39,6 +39,8 @@ public class TableCommitInfo { private long version; @SerializedName(value = "versionTime") private long versionTime; + @SerializedName(value = "ctso", alternate = {"commitTSO"}) + private long commitTSO = -1; public TableCommitInfo() { @@ -81,6 +83,14 @@ public void setVersionTime(long versionTime) { this.versionTime = versionTime; } + public long getCommitTSO() { + return commitTSO; + } + + public void setCommitTSO(long commitTSO) { + this.commitTSO = commitTSO; + } + public List generateTPartitionVersionInfos() { return idToPartitionCommitInfo .values().stream() @@ -89,8 +99,10 @@ public List generateTPartitionVersionInfos() { LOG.debug("try to publish version info partitionid [{}], version [{}]", commitInfo.getPartitionId(), commitInfo.getVersion()); } - return new TPartitionVersionInfo(commitInfo.getPartitionId(), - commitInfo.getVersion(), 0); + TPartitionVersionInfo info = new TPartitionVersionInfo(commitInfo.getPartitionId(), + commitInfo.getVersion(), 0L); + info.setCommitTso(commitTSO); + return info; }).collect(Collectors.toList()); } @@ -99,6 +111,7 @@ public String toString() { return new StringBuilder("TableCommitInfo{tableId=").append(tableId) .append(", idToPartitionCommitInfo=").append(idToPartitionCommitInfo) .append(", version=").append(version).append(", versionTime=").append(versionTime) + .append(", commitTSO=").append(commitTSO) .append('}').toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 16a4761e340505..bee5fd7d603e98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -323,6 +323,12 @@ public SchemaInfo(OlapTable olapTable) { @Setter private Set involvedBackends = Sets.newHashSet(); + // commit tso of this transaction + @SerializedName(value = "commitTSO") + @Getter + @Setter + private long commitTSO = -1; + public TransactionState() { this.dbId = -1; this.tableIdList = Lists.newArrayList(); @@ -342,6 +348,7 @@ public TransactionState() { this.publishVersionTasks = Maps.newHashMap(); this.hasSendTask = false; this.visibleLatch = new CountDownLatch(1); + this.commitTSO = -1; } public TransactionState(long dbId, List tableIdList, long transactionId, String label, TUniqueId requestId, @@ -366,6 +373,7 @@ public TransactionState(long dbId, List tableIdList, long transactionId, S this.visibleLatch = new CountDownLatch(1); this.callbackId = callbackId; this.timeoutMs = timeoutMs; + this.commitTSO = -1; } //for TxnInfoPB convert to TransactionState @@ -728,6 +736,9 @@ public String toString() { if (!subTxnIdToTableCommitInfo.isEmpty()) { sb.append(", sub txn table commit info: ").append(subTxnIdToTableCommitInfo); } + if (commitTSO > 0) { + sb.append(", commit tso: ").append(commitTSO); + } return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java b/fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java new file mode 100644 index 00000000000000..eead1626030341 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java @@ -0,0 +1,503 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tso; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.journal.local.LocalJournal; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.EditLog; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +public class TSOService extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(TSOService.class); + + // Global timestamp with physical time and logical counter + private final TSOTimestamp globalTimestamp = new TSOTimestamp(); + // Lock for thread-safe access to global timestamp + private final ReentrantLock lock = new ReentrantLock(); + // Guard value for time window updates (in milliseconds) + private static final long UPDATE_TIME_WINDOW_GUARD = 1; + + private final AtomicBoolean isInitialized = new AtomicBoolean(false); + private final AtomicBoolean fatalClockBackwardReported = new AtomicBoolean(false); + private final AtomicLong windowEndTSO = new AtomicLong(0); + + private static final class TSOClockBackwardException extends RuntimeException { + private TSOClockBackwardException(String message) { + super(message); + } + } + + /** + * Constructor initializes the TSO service with update interval + */ + public TSOService() { + super("TSO-service", Config.tso_service_update_interval_ms); + } + + /** + * Start the TSO service. + */ + @Override + public synchronized void start() { + super.start(); + } + + /** + * Periodically update timestamp after catalog is ready + * This method is called by the MasterDaemon framework + */ + @Override + protected void runAfterCatalogReady() { + if (!Config.enable_tso_feature) { + isInitialized.set(false); + fatalClockBackwardReported.set(false); + return; + } + int maxUpdateRetryCount = Math.max(1, Config.tso_max_update_retry_count); + boolean updated = false; + Throwable lastFailure = null; + if (!isInitialized.get()) { + for (int i = 0; i < maxUpdateRetryCount; i++) { + if (isInitialized.get()) { + break; + } + LOG.info("TSO service timestamp is not calibrated, start calibrate timestamp"); + try { + calibrateTimestamp(); + } catch (TSOClockBackwardException e) { + lastFailure = e; + if (fatalClockBackwardReported.compareAndSet(false, true)) { + LOG.error("TSO service calibrate timestamp failed due to clock backward beyond threshold", e); + throw e; + } + return; + } catch (Exception e) { + lastFailure = e; + LOG.warn("TSO service calibrate timestamp failed", e); + } + if (!isInitialized.get()) { + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + } + } + if (!isInitialized.get()) { + return; + } + } + + for (int i = 0; i < maxUpdateRetryCount; i++) { + try { + updateTimestamp(); + updated = true; + break; + } catch (Exception e) { + lastFailure = e; + LOG.warn("TSO service update timestamp failed, retry: {}", i, e); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED.increase(1L); + } + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + } + } + + if (updated) { + if (LOG.isDebugEnabled()) { + LOG.debug("TSO service updated timestamp"); + } + } else if (lastFailure != null) { + LOG.warn("TSO service update timestamp failed after {} retries", + maxUpdateRetryCount, lastFailure); + } else { + LOG.warn("TSO service update timestamp failed after {} retries", maxUpdateRetryCount); + } + } + + /** + * Generate a single TSO timestamp + * + * @return Composed TSO timestamp combining physical time and logical counter + * @throws RuntimeException if TSO is not calibrated or other errors occur + */ + public long getTSO() { + if (!isInitialized.get()) { + throw new RuntimeException("TSO timestamp is not calibrated, please check"); + } + int maxGetTSORetryCount = Math.max(1, Config.tso_max_get_retry_count); + RuntimeException lastFailure = null; + for (int i = 0; i < maxGetTSORetryCount; i++) { + // Wait for environment to be ready and ensure we're running on master FE + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady()) { + LOG.warn("TSO service wait for catalog ready"); + lastFailure = new RuntimeException("Env is null or not ready"); + try { + sleep(200); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } else if (!env.isMaster()) { + LOG.warn("TSO service only run on master FE"); + lastFailure = new RuntimeException("Current FE is not master"); + try { + sleep(200); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } + + Pair pair = generateTSO(); + long physical = pair.first; + long logical = pair.second; + + if (physical == 0) { + throw new RuntimeException("TSO timestamp is not calibrated, please check"); + } + + // Check for logical counter overflow + if (logical > TSOTimestamp.MAX_LOGICAL_COUNTER) { + LOG.warn("TSO timestamp logical counter overflow, please check"); + lastFailure = new RuntimeException("TSO timestamp logical counter overflow"); + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_GET_SUCCESS.increase(1L); + } + return TSOTimestamp.composeTimestamp(physical, logical); + } + throw new RuntimeException("Failed to get TSO after " + maxGetTSORetryCount + " retries", lastFailure); + } + + /** + * Get the current composed TSO timestamp + * + * @return Current TSO timestamp combining physical time and logical counter + */ + public long getCurrentTSO() { + lock.lock(); + try { + return globalTimestamp.composeTimestamp(); + } finally { + lock.unlock(); + } + } + + /** + * Calibrate the TSO timestamp when service starts + * This ensures the timestamp is consistent with the last persisted value + * + * Algorithm: + * - If Tnow - Tlast < 1ms, then Tnext = Tlast + 1 + * - Otherwise Tnext = Tnow + */ + private void calibrateTimestamp() { + if (isInitialized.get()) { + return; + } + // Check if Env is ready before calibration + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady() || !env.isMaster()) { + LOG.warn("Env is not ready or not master, skip TSO timestamp calibration"); + return; + } + + long timeLast = windowEndTSO.get(); // Last timestamp from image/editlog replay + long timeNow = System.currentTimeMillis() + Config.tso_time_offset_debug_mode; + long backwardMs = timeLast - timeNow; + if (backwardMs > Config.tso_clock_backward_startup_threshold_ms) { + throw new TSOClockBackwardException("TSO clock backward too much during calibration, backwardMs=" + + backwardMs + ", thresholdMs=" + Config.tso_clock_backward_startup_threshold_ms + + ", lastWindowEndTSO=" + timeLast + ", currentMillis=" + timeNow); + } + + // Calculate next physical time to ensure monotonicity + long nextPhysicalTime; + if (timeNow - timeLast < 1) { + nextPhysicalTime = timeLast + 1; + } else { + nextPhysicalTime = timeNow; + } + + // Construct new timestamp (physical time with reset logical counter) + setTSOPhysical(nextPhysicalTime, true); + + // Write the right boundary of time window to BDBJE for persistence + long timeWindowEnd = nextPhysicalTime + Config.tso_service_window_duration_ms; + windowEndTSO.set(timeWindowEnd); + writeTimestampToBDBJE(timeWindowEnd); + isInitialized.set(true); + + LOG.info("TSO timestamp calibrated: lastTimestamp={}, currentMillis={}, nextPhysicalTime={}, timeWindowEnd={}", + timeLast, timeNow, nextPhysicalTime, timeWindowEnd); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_CALCULATED.increase(1L); + } + } + + /** + * Update timestamp periodically to maintain time window + * This method handles various time-related issues: + * 1. Clock drift detection + * 2. Clock backward detection + * 3. Logical counter overflow handling + * 4. Time window renewal + */ + private void updateTimestamp() { + // Check if Env is ready + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady() || !env.isMaster()) { + LOG.warn("Env is not ready or not master, skip TSO timestamp update"); + return; + } + + // 1. Check if TSO has been calibrated + long currentTime = System.currentTimeMillis() + Config.tso_time_offset_debug_mode; + long prevPhysicalTime = 0; + long prevLogicalCounter = 0; + + lock.lock(); + try { + prevPhysicalTime = globalTimestamp.getPhysicalTimestamp(); + prevLogicalCounter = globalTimestamp.getLogicalCounter(); + } finally { + lock.unlock(); + } + + if (prevPhysicalTime == 0) { + LOG.error("TSO timestamp is not calibrated, please check"); + } + + // 2. Check for serious clock issues + long timeLag = currentTime - prevPhysicalTime; + if (timeLag >= 3 * Config.tso_service_update_interval_ms) { + // Clock drift (time difference too large), log clearly and trigger corresponding metric + LOG.warn("TSO clock drift detected, lastPhysicalTime={}, currentTime={}, " + + "timeLag={} (exceeds 3 * update interval {})", + prevPhysicalTime, currentTime, timeLag, 3 * Config.tso_service_update_interval_ms); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_DRIFT_DETECTED.increase(1L); + } + } else if (timeLag < 0) { + // Clock backward (current time earlier than last recorded time) + // log clearly and trigger corresponding metric + LOG.warn("TSO clock backward detected, lastPhysicalTime={}, currentTime={}, " + + "timeLag={} (current time is earlier than last physical time)", + prevPhysicalTime, currentTime, timeLag); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_BACKWARD_DETECTED.increase(1L); + } + } + + // 3. Update time based on conditions + long nextPhysicalTime = prevPhysicalTime; + if (timeLag > UPDATE_TIME_WINDOW_GUARD) { + // Align physical time to current time + nextPhysicalTime = currentTime; + } else if (prevLogicalCounter > TSOTimestamp.MAX_LOGICAL_COUNTER / 2) { + // Logical counter nearly full → advance to next millisecond + nextPhysicalTime = prevPhysicalTime + 1; + } else { + // Logical counter not nearly full → just increment logical counter + // do nothing + } + + // 4. Check if time window right boundary needs renewal + if ((windowEndTSO.get() - nextPhysicalTime) <= UPDATE_TIME_WINDOW_GUARD) { + // Time window right boundary needs renewal + long nextWindowEnd = nextPhysicalTime + Config.tso_service_window_duration_ms; + windowEndTSO.set(nextWindowEnd); + writeTimestampToBDBJE(nextWindowEnd); + } + + // 5. Update global timestamp + setTSOPhysical(nextPhysicalTime, false); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_UPDATED.increase(1L); + } + } + + /** + * Write the right boundary of TSO time window to BDBJE for persistence + * + * @param timestamp The timestamp to write + */ + private void writeTimestampToBDBJE(long timestamp) { + try { + // Check if Env is ready + Env env = Env.getCurrentEnv(); + if (env == null) { + LOG.warn("Env is null, skip writing TSO timestamp to BDBJE"); + return; + } + + // Check if Env is ready and is master + if (!env.isReady()) { + LOG.warn("Env is not ready, skip writing TSO timestamp to BDBJE"); + return; + } + + if (!env.isMaster()) { + LOG.warn("Current node is not master, skip writing TSO timestamp to BDBJE"); + return; + } + + TSOTimestamp tsoTimestamp = new TSOTimestamp(timestamp, 0); + + // Check if EditLog is available + EditLog editLog = env.getEditLog(); + if (editLog == null) { + LOG.warn("EditLog is null, skip writing TSO timestamp to BDBJE"); + return; + } + + // Additional check to ensure EditLog's journal is properly initialized + if (editLog.getJournal() == null) { + LOG.warn("EditLog's journal is null, skip writing TSO timestamp to BDBJE"); + return; + } + + if (editLog.getJournal() instanceof LocalJournal) { + if (!((LocalJournal) editLog.getJournal()).isReadyToFlush()) { + LOG.warn("EditLog's journal is not ready to flush, skip writing TSO timestamp to BDBJE"); + return; + } + } + + if (Config.enable_tso_persist_journal) { + editLog.logTSOTimestampWindowEnd(tsoTimestamp); + } else { + LOG.debug("TSO timestamp {} is not persisted to journal, " + + "please check if enable_tso_persist_journal is set to true", + tsoTimestamp); + } + } catch (Exception e) { + LOG.error("Failed to write TSO timestamp to BDBJE", e); + } + } + + /** + * Generate a single TSO timestamp by incrementing the logical counter + * + * @return Pair of (physicalTime, updatedLogicalCounter) for the base timestamp + */ + private Pair generateTSO() { + lock.lock(); + try { + long physicalTime = globalTimestamp.getPhysicalTimestamp(); + if (physicalTime == 0) { + return Pair.of(0L, 0L); + } + long logicalCounter = globalTimestamp.getLogicalCounter(); + if (logicalCounter >= TSOTimestamp.MAX_LOGICAL_COUNTER) { + return Pair.of(physicalTime, logicalCounter + 1); + } + long nextLogical = logicalCounter + 1; + globalTimestamp.setLogicalCounter(nextLogical); + return Pair.of(physicalTime, nextLogical); + } finally { + lock.unlock(); + } + } + + /** + * Set the physical time component of the global timestamp + * + * @param next New physical time value + * @param force Whether to force update even if physical time is zero + */ + private void setTSOPhysical(long next, boolean force) { + lock.lock(); + try { + // Do not update the zero physical time if the `force` flag is false. + if (!force && globalTimestamp.getPhysicalTimestamp() == 0) { + return; + } + if (next - globalTimestamp.getPhysicalTimestamp() > 0) { + globalTimestamp.setPhysicalTimestamp(next); + globalTimestamp.setLogicalCounter(0L); + } + } finally { + lock.unlock(); + } + } + + /** + * Replay handler for TSO window end timestamp from edit log. + * This method updates TSO service state. + * It is safe to call during checkpoint replay when TSOService may not be initialized. + * + * @param windowEnd New window end physical time + */ + public void replayWindowEndTSO(TSOTimestamp windowEnd) { + windowEndTSO.set(windowEnd.getPhysicalTimestamp()); + } + + public long getWindowEndTSO() { + return windowEndTSO.get(); + } + + public long saveTSO(CountingDataOutputStream dos, long checksum) throws IOException { + if (!Config.enable_tso_feature || !Config.enable_tso_checkpoint_module) { + return checksum; + } + TSOTimestamp tsoTimestamp = new TSOTimestamp(windowEndTSO.get(), 0); + tsoTimestamp.write(dos); + checksum ^= tsoTimestamp.getPhysicalTimestamp(); + LOG.info("Save TSO windowEndTSO {} to image", tsoTimestamp); + return checksum; + } + + public long loadTSO(DataInputStream dis, long checksum) throws IOException { + TSOTimestamp tsoTimestamp = TSOTimestamp.read(dis); + windowEndTSO.set(tsoTimestamp.getPhysicalTimestamp()); + long newChecksum = checksum ^ tsoTimestamp.getPhysicalTimestamp(); + LOG.info("Finished replay TSO windowEndTSO {} from image", windowEndTSO.get()); + return newChecksum; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tso/TSOTimestamp.java b/fe/fe-core/src/main/java/org/apache/doris/tso/TSOTimestamp.java new file mode 100644 index 00000000000000..4b2eef7ee075ad --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tso/TSOTimestamp.java @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tso; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * TSOTimestamp represents a Timestamp Oracle timestamp with physical time and logical counter. + * + * TSO timestamp format (64 bits): + * 63 18 17 0 + * ┌─────────────────────────────────────────────────────────┬──────────────────────────────────────┐ + * | Physical Time (milliseconds, 46 bits) │ Logical Counter 18 bits │ + * └─────────────────────────────────────────────────────────┴──────────────────────────────────────┘ + * + * Example: + * Physical time: 1625097600000 (milliseconds, 46 bits) + * Logical counter: 123 (18 bits) + * Combined TSO: 123456789012345678 + */ +public final class TSOTimestamp implements Writable, Comparable { + + @SerializedName(value = "physicalTimestamp") + private long physicalTimestamp = 0L; + @SerializedName(value = "logicalCounter") + private long logicalCounter = 0L; + + // Bit width for each field + private static final int LOGICAL_BITS = 18; // Logical counter bits + private static final int PHYSICAL_BITS = 46; // Physical time bits (milliseconds) + + // Starting bit offset for each field (relative to bit0) + private static final int PHYSICAL_SHIFT = LOGICAL_BITS; // 18 + + // Masks for each field in 64-bit TSO + private static final long LOGICAL_MASK = ((1L << LOGICAL_BITS) - 1L); + private static final long PHYSICAL_MASK = ((1L << PHYSICAL_BITS) - 1L) << PHYSICAL_SHIFT; + + // Raw masks for bit operations + private static final long RAW_LOGICAL_MASK = (1L << LOGICAL_BITS) - 1L; + private static final long RAW_PHYSICAL_MASK = (1L << PHYSICAL_BITS) - 1L; + + // Maximum logical counter value + public static final long MAX_LOGICAL_COUNTER = (1L << LOGICAL_BITS) - 1L; + + /** + * Constructor with specific physical time and logical counter + * + * @param physicalTimestamp Physical time in milliseconds + * @param logicalCounter Logical counter value + */ + @VisibleForTesting + public TSOTimestamp(long physicalTimestamp, long logicalCounter) { + if (physicalTimestamp < 0 || logicalCounter < 0) { + throw new IllegalArgumentException("TSO components must be non-negative"); + } + this.physicalTimestamp = physicalTimestamp; + this.logicalCounter = logicalCounter; + } + + public TSOTimestamp(long timestamp) { + this(extractPhysicalTime(timestamp), extractLogicalCounter(timestamp)); + } + + /** + * Default constructor initializes with zero values + */ + @VisibleForTesting + public TSOTimestamp() { + this(0L, 0L); + } + + /** + * Compose 64-bit TSO timestamp from physical time and logical counter + * + * @return 64-bit TSO timestamp + */ + public long composeTimestamp() { + return composeTimestamp(physicalTimestamp, logicalCounter); + } + + /** + * Extract physical time (milliseconds) from TSO timestamp + * + * @param timestamp 64-bit TSO timestamp + * @return Physical time in milliseconds + */ + public static long extractPhysicalTime(long timestamp) { + return (timestamp & PHYSICAL_MASK) >>> PHYSICAL_SHIFT; + } + + /** + * Extract logical counter from TSO timestamp + * + * @param timestamp 64-bit TSO timestamp + * @return Logical counter value + */ + public static long extractLogicalCounter(long timestamp) { + return (timestamp & LOGICAL_MASK); + } + + /** + * Get physical timestamp + * + * @return Physical timestamp in milliseconds + */ + public long getPhysicalTimestamp() { + return physicalTimestamp; + } + + /** + * Set physical timestamp + * + * @param physicalTimestamp Physical timestamp in milliseconds + */ + public void setPhysicalTimestamp(long physicalTimestamp) { + if (physicalTimestamp < 0) { + throw new IllegalArgumentException("physicalTimestamp must be non-negative"); + } + this.physicalTimestamp = physicalTimestamp; + } + + /** + * Get logical counter + * + * @return Logical counter value + */ + public long getLogicalCounter() { + return logicalCounter; + } + + /** + * Set logical counter + * + * @param logicalCounter Logical counter value + */ + public void setLogicalCounter(long logicalCounter) { + if (logicalCounter < 0) { + throw new IllegalArgumentException("logicalCounter must be non-negative"); + } + this.logicalCounter = logicalCounter; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this, TSOTimestamp.class); + Text.writeString(out, json); + } + + public static TSOTimestamp read(DataInput dataInput) throws IOException { + String json = Text.readString(dataInput); + TSOTimestamp tsoTimestamp = GsonUtils.GSON.fromJson(json, TSOTimestamp.class); + if (tsoTimestamp == null) { + throw new IOException("failed to deserialize TSOTimestamp from journal/image"); + } + return tsoTimestamp; + } + + /** + * Compose TSO timestamp from physical time and logical counter + * + * @param physicalTime Physical time (milliseconds) + * @param logicalCounter Logical counter + * @return 64-bit TSO timestamp + */ + public static long composeTimestamp(long physicalTime, long logicalCounter) { + // Prevent overflow by masking to appropriate bit widths + long physical = physicalTime & RAW_PHYSICAL_MASK; // Keep only 46 bits + long logical = logicalCounter & RAW_LOGICAL_MASK; // Keep only 18 bits + + // Bitwise assembly: High 46 bits physical time + Low 18 bits logical counter + return (physical << PHYSICAL_SHIFT) + | (logical); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("physical timestamp: ").append(physicalTimestamp); + sb.append(", logical counter: ").append(logicalCounter); + + return sb.toString(); + } + + public String toJson() { + return GsonUtils.GSON.toJson(this, TSOTimestamp.class); + } + + /** + * Decompose a composed 64-bit TSO into physical and logical parts + */ + public static TSOTimestamp decompose(long tso) { + long physical = extractPhysicalTime(tso); + long logical = extractLogicalCounter(tso); + return new TSOTimestamp(physical, logical); + } + + @Override + public int compareTo(TSOTimestamp other) { + int cmp = Long.compare(this.physicalTimestamp, other.physicalTimestamp); + return (cmp != 0) ? cmp : Long.compare(this.logicalCounter, other.logicalCounter); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TSOTimestamp)) { + return false; + } + TSOTimestamp t = (TSOTimestamp) o; + return physicalTimestamp == t.physicalTimestamp + && logicalCounter == t.logicalCounter; + } + + @Override + public int hashCode() { + int result = Long.hashCode(physicalTimestamp); + result = 31 * result + Long.hashCode(logicalCounter); + return result; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/TSOActionTest.java b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/TSOActionTest.java new file mode 100644 index 00000000000000..b34f197be2e8e4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/TSOActionTest.java @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Env; +import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.tso.TSOService; +import org.apache.doris.tso.TSOTimestamp; + +import com.google.common.collect.Maps; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.http.ResponseEntity; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class TSOActionTest { + private Env env; + private TestableTSOAction action; + + @Before + public void setUp() { + new EnvMockUp(); + env = Mockito.mock(Env.class); + EnvMockUp.CURRENT_ENV.set(env); + action = new TestableTSOAction(); + } + + @Test + public void testGetTSOEnvNullReturnsBadRequest() { + EnvMockUp.CURRENT_ENV.set(null); + ResponseEntity resp = (ResponseEntity) action.getTSO(mockRequest(), mockResponse()); + ResponseBody body = (ResponseBody) resp.getBody(); + Assert.assertNotNull(body); + Assert.assertEquals(RestApiStatusCode.BAD_REQUEST.code, body.getCode()); + } + + @Test + public void testGetTSOEnvNotReadyReturnsBadRequest() { + Mockito.when(env.isReady()).thenReturn(false); + ResponseEntity resp = (ResponseEntity) action.getTSO(mockRequest(), mockResponse()); + ResponseBody body = (ResponseBody) resp.getBody(); + Assert.assertNotNull(body); + Assert.assertEquals(RestApiStatusCode.BAD_REQUEST.code, body.getCode()); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetTSONotMasterForwardToMasterAndAddMasterAddress() { + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(false); + Mockito.when(env.getMasterHost()).thenReturn("master-fe"); + Mockito.when(env.getMasterHttpPort()).thenReturn(8030); + Map forwarded = Maps.newHashMap(); + forwarded.put("code", 0); + forwarded.put("msg", "OK"); + ((TestableTSOAction) action).setForwardResult(forwarded); + + Object result = action.getTSO(mockRequest(), mockResponse()); + Assert.assertTrue(result instanceof Map); + Map resp = (Map) result; + Assert.assertEquals("master-fe:8030", resp.get("master_fe_addr")); + Assert.assertEquals(0, resp.get("code")); + } + + @Test + public void testGetTSONotMasterKeepResponseEntityWhenForwardingFails() { + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(false); + Mockito.when(env.getMasterHost()).thenReturn("master-fe"); + Mockito.when(env.getMasterHttpPort()).thenReturn(8030); + ((TestableTSOAction) action).setForwardResult(ResponseEntityBuilder.badRequest("forward failed")); + + Object result = action.getTSO(mockRequest(), mockResponse()); + Assert.assertTrue(result instanceof ResponseEntity); + ResponseBody body = (ResponseBody) ((ResponseEntity) result).getBody(); + Assert.assertNotNull(body); + Assert.assertEquals(RestApiStatusCode.BAD_REQUEST.code, body.getCode()); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetTSOSuccessPayloadConsistency() { + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(true); + TSOService tsoService = Mockito.mock(TSOService.class); + Mockito.when(env.getTSOService()).thenReturn(tsoService); + Mockito.when(tsoService.getWindowEndTSO()).thenReturn(12345L); + long currentTso = TSOTimestamp.composeTimestamp(1000L, 12L); + Mockito.when(tsoService.getCurrentTSO()).thenReturn(currentTso); + + ResponseEntity resp = (ResponseEntity) action.getTSO(mockRequest(), mockResponse()); + ResponseBody body = (ResponseBody) resp.getBody(); + Assert.assertNotNull(body); + Assert.assertEquals(RestApiStatusCode.OK.code, body.getCode()); + + Map data = (Map) body.getData(); + Assert.assertEquals(12345L, ((Number) data.get("window_end_physical_time")).longValue()); + Assert.assertEquals(currentTso, ((Number) data.get("current_tso")).longValue()); + Assert.assertEquals(TSOTimestamp.extractPhysicalTime(currentTso), + ((Number) data.get("current_tso_physical_time")).longValue()); + Assert.assertEquals(TSOTimestamp.extractLogicalCounter(currentTso), + ((Number) data.get("current_tso_logical_counter")).longValue()); + } + + private static HttpServletRequest mockRequest() { + return Mockito.mock(HttpServletRequest.class); + } + + private static HttpServletResponse mockResponse() { + return Mockito.mock(HttpServletResponse.class); + } + + private static final class TestableTSOAction extends TSOAction { + private Object forwardResult; + + void setForwardResult(Object forwardResult) { + this.forwardResult = forwardResult; + } + + @Override + public ActionAuthorizationInfo executeCheckPassword(HttpServletRequest request, HttpServletResponse response) { + return null; + } + + @Override + public Object forwardToMaster(HttpServletRequest request) { + return forwardResult; + } + } + + private static final class EnvMockUp extends MockUp { + private static final AtomicReference CURRENT_ENV = new AtomicReference<>(); + + @Mock + public static Env getCurrentEnv() { + return CURRENT_ENV.get(); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 365cf8051d82e9..79d32aec6b49ac 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FakeEditLog; import org.apache.doris.catalog.FakeEnv; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -33,6 +34,7 @@ import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.transaction.GlobalTransactionMgrTest.SubTransactionInfo; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.tso.TSOService; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -43,7 +45,9 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.Mockito; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.HashMap; @@ -592,4 +596,120 @@ private void addSubTransaction() throws UserException { LabelToTxnId.put(CatalogTestUtil.testTxnLabel7, transactionState7.getTransactionId()); LabelToTxnId.put(CatalogTestUtil.testTxnLabel8, transactionState8.getTransactionId()); } + + @Test + public void testCommitTransactionSetsCommitTSOWhenEnableTso() throws Exception { + boolean originalEnableTsoFeature = Config.enable_tso_feature; + try { + Config.enable_tso_feature = true; + FakeEnv.setEnv(masterEnv); + + OlapTable table = (OlapTable) masterEnv.getInternalCatalog() + .getDbOrMetaException(CatalogTestUtil.testDbId1) + .getTableOrMetaException(CatalogTestUtil.testTableId1); + table.setEnableTso(true); + + long expectedCommitTSO = 12345L; + TSOService tsoService = Mockito.mock(TSOService.class); + Mockito.when(tsoService.getTSO()).thenReturn(expectedCommitTSO); + setEnvTSOService(masterEnv, tsoService); + + String label = "commitTSO_test_" + System.nanoTime(); + long txnId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + Lists.newArrayList(CatalogTestUtil.testTableId1), + label, transactionSource, TransactionState.LoadJobSourceType.FRONTEND, + Config.stream_load_default_timeout_second); + List transTablets = GlobalTransactionMgrTest.generateTabletCommitInfos( + CatalogTestUtil.testTabletId1, allBackends); + masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(table), + txnId, transTablets, null); + + TransactionState transactionState = fakeEditLog.getTransaction(txnId); + Assert.assertNotNull(transactionState); + Assert.assertEquals(expectedCommitTSO, transactionState.getCommitTSO()); + TableCommitInfo tableCommitInfo = transactionState.getIdToTableCommitInfos().get(CatalogTestUtil.testTableId1); + Assert.assertNotNull(tableCommitInfo); + Assert.assertEquals(expectedCommitTSO, tableCommitInfo.getCommitTSO()); + } finally { + Config.enable_tso_feature = originalEnableTsoFeature; + } + } + + @Test + public void testCommitTransactionCommitTSORemainsMinusOneWhenTableDisableTso() throws Exception { + boolean originalEnableTsoFeature = Config.enable_tso_feature; + try { + Config.enable_tso_feature = true; + FakeEnv.setEnv(masterEnv); + + OlapTable table = (OlapTable) masterEnv.getInternalCatalog() + .getDbOrMetaException(CatalogTestUtil.testDbId1) + .getTableOrMetaException(CatalogTestUtil.testTableId1); + table.setEnableTso(false); + + TSOService tsoService = Mockito.mock(TSOService.class); + Mockito.when(tsoService.getTSO()).thenReturn(12345L); + setEnvTSOService(masterEnv, tsoService); + + String label = "commitTSO_tableDisable_test_" + System.nanoTime(); + long txnId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + Lists.newArrayList(CatalogTestUtil.testTableId1), + label, transactionSource, TransactionState.LoadJobSourceType.FRONTEND, + Config.stream_load_default_timeout_second); + List transTablets = GlobalTransactionMgrTest.generateTabletCommitInfos( + CatalogTestUtil.testTabletId1, allBackends); + masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(table), + txnId, transTablets, null); + + TransactionState transactionState = fakeEditLog.getTransaction(txnId); + Assert.assertNotNull(transactionState); + Assert.assertEquals(-1L, transactionState.getCommitTSO()); + TableCommitInfo tableCommitInfo = transactionState.getIdToTableCommitInfos().get(CatalogTestUtil.testTableId1); + Assert.assertNotNull(tableCommitInfo); + Assert.assertEquals(-1L, tableCommitInfo.getCommitTSO()); + } finally { + Config.enable_tso_feature = originalEnableTsoFeature; + } + } + + @Test + public void testCommitTransactionFailsWhenGetTSOInvalid() throws Exception { + boolean originalEnableTsoFeature = Config.enable_tso_feature; + try { + Config.enable_tso_feature = true; + FakeEnv.setEnv(masterEnv); + + OlapTable table = (OlapTable) masterEnv.getInternalCatalog() + .getDbOrMetaException(CatalogTestUtil.testDbId1) + .getTableOrMetaException(CatalogTestUtil.testTableId1); + table.setEnableTso(true); + + TSOService tsoService = Mockito.mock(TSOService.class); + Mockito.when(tsoService.getTSO()).thenReturn(-1L); + setEnvTSOService(masterEnv, tsoService); + + String label = "commitTSO_getTSOInvalid_test_" + System.nanoTime(); + long txnId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + Lists.newArrayList(CatalogTestUtil.testTableId1), + label, transactionSource, TransactionState.LoadJobSourceType.FRONTEND, + Config.stream_load_default_timeout_second); + List transTablets = GlobalTransactionMgrTest.generateTabletCommitInfos( + CatalogTestUtil.testTabletId1, allBackends); + try { + masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(table), + txnId, transTablets, null); + Assert.fail(); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("failed to get TSO")); + } + } finally { + Config.enable_tso_feature = originalEnableTsoFeature; + } + } + + private static void setEnvTSOService(Env env, TSOService service) throws Exception { + Field f = Env.class.getDeclaredField("tsoService"); + f.setAccessible(true); + f.set(env, service); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/tso/TSOServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/tso/TSOServiceTest.java new file mode 100644 index 00000000000000..2398da8e4130b6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/tso/TSOServiceTest.java @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tso; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.journal.Journal; +import org.apache.doris.metric.LongCounterMetric; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.EditLog; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Unit tests for TSOService class. + */ +public class TSOServiceTest { + + private TSOService tsoService; + private Env env; + + private int originalMaxGetTSORetryCount; + private int originalMaxUpdateRetryCount; + private int originalUpdateIntervalMs; + private boolean originalEnableTsoPersistJournal; + private long originalClockBackwardThresholdMs; + + @Before + public void setUp() { + new EnvMockUp(); + + originalMaxGetTSORetryCount = Config.tso_max_get_retry_count; + originalMaxUpdateRetryCount = Config.tso_max_update_retry_count; + originalUpdateIntervalMs = Config.tso_service_update_interval_ms; + originalEnableTsoPersistJournal = Config.enable_tso_persist_journal; + originalClockBackwardThresholdMs = Config.tso_clock_backward_startup_threshold_ms; + + Config.tso_max_get_retry_count = 1; + Config.tso_max_update_retry_count = 1; + Config.tso_service_update_interval_ms = 1; + Config.enable_tso_persist_journal = false; + Config.tso_clock_backward_startup_threshold_ms = 30L * 60 * 1000; + + env = Mockito.mock(Env.class); + EnvMockUp.CURRENT_ENV.set(env); + + tsoService = new TSOService(); + } + + @After + public void tearDown() { + EnvMockUp.CURRENT_ENV.set(null); + Config.tso_max_get_retry_count = originalMaxGetTSORetryCount; + Config.tso_max_update_retry_count = originalMaxUpdateRetryCount; + Config.tso_service_update_interval_ms = originalUpdateIntervalMs; + Config.enable_tso_persist_journal = originalEnableTsoPersistJournal; + Config.tso_clock_backward_startup_threshold_ms = originalClockBackwardThresholdMs; + } + + @Test + public void testConstructor() { + TSOService service = new TSOService(); + Assert.assertNotNull(service); + } + + @Test + public void testGetCurrentTSO() { + TSOService service = new TSOService(); + long currentTSO = service.getCurrentTSO(); + // Should be 0 since not initialized + Assert.assertEquals(0L, currentTSO); + } + + @Test + public void testGetTSOThrowsWhenEnvNotReady() { + setInitializedFlag(tsoService, true); + Mockito.when(env.isReady()).thenReturn(false); + try { + tsoService.getTSO(); + Assert.fail(); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().contains("Failed to get TSO")); + } + } + + @Test + public void testGetTSOThrowsWhenNotCalibrated() throws Exception { + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(true); + try { + tsoService.getTSO(); + Assert.fail(); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().contains("not calibrated")); + } + } + + @Test + public void testGetTSOThrowsOnLogicalOverflow() throws Exception { + setInitializedFlag(tsoService, true); + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(true); + setGlobalTimestamp(tsoService, 100L, TSOTimestamp.MAX_LOGICAL_COUNTER); + try { + tsoService.getTSO(); + Assert.fail(); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().contains("Failed to get TSO")); + Assert.assertNotNull(e.getCause()); + Assert.assertTrue(e.getCause().getMessage().contains("logical counter overflow")); + Assert.assertEquals(TSOTimestamp.MAX_LOGICAL_COUNTER, getGlobalLogicalCounter(tsoService)); + } + } + + @Test + public void testGetTSOAcceptsLogicalCounterUpperBound() throws Exception { + setInitializedFlag(tsoService, true); + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(true); + setGlobalTimestamp(tsoService, 100L, TSOTimestamp.MAX_LOGICAL_COUNTER - 1); + long tso = tsoService.getTSO(); + Assert.assertEquals(TSOTimestamp.composeTimestamp(100L, TSOTimestamp.MAX_LOGICAL_COUNTER), tso); + } + + @Test + public void testRunAfterCatalogReadySetsIntervalTo50WhenDisabled() { + boolean originalEnableTsoFeature = Config.enable_tso_feature; + try { + setInitializedFlag(tsoService, true); + Config.enable_tso_feature = false; + tsoService.runAfterCatalogReady(); + Assert.assertEquals(1L, tsoService.getInterval()); + try { + tsoService.getTSO(); + Assert.fail(); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().contains("not calibrated")); + } + } finally { + Config.enable_tso_feature = originalEnableTsoFeature; + } + } + + @Test + public void testRunAfterCatalogReadyUsesAtLeastOneRetryWhenConfigNonPositive() { + boolean originalEnableTsoFeature = Config.enable_tso_feature; + try { + Config.enable_tso_feature = true; + Config.tso_max_update_retry_count = 0; + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(true); + tsoService.runAfterCatalogReady(); + Assert.assertTrue(tsoService.getTSO() > 0); + } finally { + Config.enable_tso_feature = originalEnableTsoFeature; + } + } + + @Test + public void testRunAfterCatalogReadyUpdateFailureDoesNotTouchMetricWhenNotInit() throws Exception { + boolean originalEnableTsoFeature = Config.enable_tso_feature; + boolean originalMetricInit = MetricRepo.isInit; + LongCounterMetric originalUpdateFailedMetric = MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED; + try { + Config.enable_tso_feature = true; + setInitializedFlag(tsoService, true); + setGlobalTimestamp(tsoService, 100L, 1L); + MetricRepo.isInit = false; + MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED = null; + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenThrow(new RuntimeException("injected update failure")); + tsoService.runAfterCatalogReady(); + } finally { + Config.enable_tso_feature = originalEnableTsoFeature; + MetricRepo.isInit = originalMetricInit; + MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED = originalUpdateFailedMetric; + } + } + + @Test + public void testReplayWindowEndTSOUpdatesServiceState() { + long windowEnd = 12345L; + tsoService.replayWindowEndTSO(new TSOTimestamp(windowEnd, 0L)); + Assert.assertEquals(windowEnd, tsoService.getWindowEndTSO()); + } + + @Test + public void testWriteTimestampToBdbJeSkipsWhenEnvNotReady() throws Exception { + EditLog editLog = Mockito.mock(EditLog.class); + Mockito.when(env.isReady()).thenReturn(false); + Mockito.when(env.getEditLog()).thenReturn(editLog); + + invokeWriteTimestampToBdbJe(tsoService, 123L); + Mockito.verifyNoInteractions(editLog); + } + + @Test + public void testWriteTimestampToBdbJeWritesWhenEnabledAndJournalReady() throws Exception { + EditLog editLog = Mockito.mock(EditLog.class); + Journal journal = Mockito.mock(Journal.class); + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(true); + Mockito.when(env.getEditLog()).thenReturn(editLog); + Mockito.when(editLog.getJournal()).thenReturn(journal); + + Config.enable_tso_persist_journal = true; + invokeWriteTimestampToBdbJe(tsoService, 123L); + Mockito.verify(editLog).logTSOTimestampWindowEnd(Mockito.any(TSOTimestamp.class)); + } + + @Test + public void testCalibrateTimestampThrowsWhenClockBackwardExceedsThreshold() throws Exception { + Mockito.when(env.isReady()).thenReturn(true); + Mockito.when(env.isMaster()).thenReturn(true); + long now = System.currentTimeMillis() + Config.tso_time_offset_debug_mode; + tsoService.replayWindowEndTSO(new TSOTimestamp( + now + Config.tso_clock_backward_startup_threshold_ms + 60_000, 0L)); + try { + invokeCalibrateTimestamp(tsoService); + Assert.fail(); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().contains("clock backward too much")); + } + } + + private static void invokeWriteTimestampToBdbJe(TSOService service, long timestamp) throws Exception { + Method m = TSOService.class.getDeclaredMethod("writeTimestampToBDBJE", long.class); + m.setAccessible(true); + m.invoke(service, timestamp); + } + + private static void invokeCalibrateTimestamp(TSOService service) throws Exception { + Method m = TSOService.class.getDeclaredMethod("calibrateTimestamp"); + m.setAccessible(true); + try { + m.invoke(service); + } catch (InvocationTargetException e) { + Throwable cause = e.getTargetException(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + throw e; + } + } + + private static void setGlobalTimestamp(TSOService service, long physical, long logical) throws Exception { + Field f = TSOService.class.getDeclaredField("globalTimestamp"); + f.setAccessible(true); + TSOTimestamp timestamp = (TSOTimestamp) f.get(service); + timestamp.setPhysicalTimestamp(physical); + timestamp.setLogicalCounter(logical); + } + + private static long getGlobalLogicalCounter(TSOService service) throws Exception { + Field f = TSOService.class.getDeclaredField("globalTimestamp"); + f.setAccessible(true); + TSOTimestamp timestamp = (TSOTimestamp) f.get(service); + return timestamp.getLogicalCounter(); + } + + private static void setInitializedFlag(TSOService service, boolean initialized) { + try { + Field f = TSOService.class.getDeclaredField("isInitialized"); + f.setAccessible(true); + ((java.util.concurrent.atomic.AtomicBoolean) f.get(service)).set(initialized); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static final class EnvMockUp extends MockUp { + private static final AtomicReference CURRENT_ENV = new AtomicReference<>(); + + @Mock + public static Env getCurrentEnv() { + return CURRENT_ENV.get(); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/tso/TSOTimestampTest.java b/fe/fe-core/src/test/java/org/apache/doris/tso/TSOTimestampTest.java new file mode 100644 index 00000000000000..5d351ddba70b99 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/tso/TSOTimestampTest.java @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tso; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; + +/** + * Unit tests for TSOTimestamp class. + */ +public class TSOTimestampTest { + + @Test + public void testConstructor() { + // Test default constructor + TSOTimestamp timestamp = new TSOTimestamp(); + Assert.assertEquals(0L, timestamp.getPhysicalTimestamp()); + Assert.assertEquals(0L, timestamp.getLogicalCounter()); + + // Test constructor with parameters + long physicalTime = 1625097600000L; + long logicalCounter = 123L; + timestamp = new TSOTimestamp(physicalTime, logicalCounter); + Assert.assertEquals(physicalTime, timestamp.getPhysicalTimestamp()); + Assert.assertEquals(logicalCounter, timestamp.getLogicalCounter()); + } + + @Test + public void testComposeAndExtractTimestamp() { + long physicalTime = 1625097600000L; + long logicalCounter = 123L; + + TSOTimestamp timestamp = new TSOTimestamp(physicalTime, logicalCounter); + long composed = timestamp.composeTimestamp(); + + // Verify extraction works correctly + Assert.assertEquals(physicalTime, TSOTimestamp.extractPhysicalTime(composed)); + Assert.assertEquals(logicalCounter, TSOTimestamp.extractLogicalCounter(composed)); + } + + @Test + public void testBitWidthLimitations() { + // Test that values are properly masked to fit in their respective bit widths + long largePhysicalTime = (1L << 46) + 1000L; // Larger than 46 bits + long largeLogicalCounter = (1L << 18) + 50L; // Larger than 18 bits + + TSOTimestamp timestamp = new TSOTimestamp(largePhysicalTime, largeLogicalCounter); + long composed = timestamp.composeTimestamp(); + + // Values should be masked to fit in their respective bit widths + Assert.assertEquals(largePhysicalTime & ((1L << 46) - 1), TSOTimestamp.extractPhysicalTime(composed)); + Assert.assertEquals(largeLogicalCounter & ((1L << 18) - 1), TSOTimestamp.extractLogicalCounter(composed)); + } + + @Test + public void testSetterAndGetters() { + TSOTimestamp timestamp = new TSOTimestamp(); + + long physicalTime = 1625097600000L; + long logicalCounter = 456L; + + timestamp.setPhysicalTimestamp(physicalTime); + timestamp.setLogicalCounter(logicalCounter); + + Assert.assertEquals(physicalTime, timestamp.getPhysicalTimestamp()); + Assert.assertEquals(logicalCounter, timestamp.getLogicalCounter()); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorRejectNegativePhysicalTimestamp() { + new TSOTimestamp(-1L, 0L); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorRejectNegativeLogicalCounter() { + new TSOTimestamp(0L, -1L); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetterRejectNegativePhysicalTimestamp() { + TSOTimestamp timestamp = new TSOTimestamp(); + timestamp.setPhysicalTimestamp(-1L); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetterRejectNegativeLogicalCounter() { + TSOTimestamp timestamp = new TSOTimestamp(); + timestamp.setLogicalCounter(-1L); + } + + @Test + public void testDecomposeIsConsistentWithExtract() { + long physicalTime = 1625097600000L; + long logicalCounter = 123L; + long tso = TSOTimestamp.composeTimestamp(physicalTime, logicalCounter); + + TSOTimestamp decomposed = TSOTimestamp.decompose(tso); + Assert.assertEquals(TSOTimestamp.extractPhysicalTime(tso), decomposed.getPhysicalTimestamp()); + Assert.assertEquals(TSOTimestamp.extractLogicalCounter(tso), decomposed.getLogicalCounter()); + } + + @Test + public void testWritableRoundTrip() throws Exception { + TSOTimestamp timestamp = new TSOTimestamp(1625097600000L, 123L); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + timestamp.write(dos); + dos.flush(); + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + TSOTimestamp restored = TSOTimestamp.read(dis); + + Assert.assertEquals(timestamp, restored); + Assert.assertEquals(timestamp.hashCode(), restored.hashCode()); + Assert.assertEquals(0, timestamp.compareTo(restored)); + } + + @Test + public void testCompareToOrdersByPhysicalThenLogical() { + TSOTimestamp a = new TSOTimestamp(100L, 2L); + TSOTimestamp b = new TSOTimestamp(100L, 3L); + TSOTimestamp c = new TSOTimestamp(101L, 0L); + Assert.assertTrue(a.compareTo(b) < 0); + Assert.assertTrue(b.compareTo(c) < 0); + Assert.assertTrue(a.compareTo(c) < 0); + } + + @Test + public void testMaxLogicalCounter() { + // Test the maximum logical counter value + Assert.assertEquals((1L << 18) - 1, TSOTimestamp.MAX_LOGICAL_COUNTER); + } +} diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 0d4359fe16215f..c6760811345f8e 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -165,6 +165,8 @@ message RowsetMetaPB { optional bool is_recycled = 1013; // for recycler mark rowset as recycled optional string job_id = 1014; + + optional int64 commit_tso = 1015 [default = -1]; } message SchemaDictKeyList { @@ -270,6 +272,8 @@ message RowsetMetaCloudPB { optional bool is_recycled = 112; optional string job_id = 113; + optional int64 commit_tso = 114 [default = -1]; + } message SegmentStatisticsPB { @@ -721,6 +725,7 @@ message BinlogMetaEntryPB { message PendingPublishInfoPB { optional int64 partition_id = 1; optional int64 transaction_id = 2; + optional int64 commit_tso = 3; } message RowsetBinlogMetasPB { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index c8c65513dc26e3..61ee5a0d144216 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -459,6 +459,7 @@ struct TPartitionVersionInfo { 1: required Types.TPartitionId partition_id 2: required Types.TVersion version 3: required Types.TVersionHash version_hash // Deprecated + 4: optional i64 commit_tso = -1 } struct TMoveDirReq { diff --git a/regression-test/data/query_p0/system/test_query_sys_rowsets.out b/regression-test/data/query_p0/system/test_query_sys_rowsets.out index d9e5a070d1c65e..f41afa34301453 100644 --- a/regression-test/data/query_p0/system/test_query_sys_rowsets.out +++ b/regression-test/data/query_p0/system/test_query_sys_rowsets.out @@ -13,6 +13,7 @@ DATA_DISK_SIZE bigint Yes false \N CREATION_TIME datetime Yes false \N NEWEST_WRITE_TIMESTAMP datetime Yes false \N SCHEMA_VERSION int Yes false \N +COMMIT_TSO bigint Yes false \N -- !rowsets1 -- 0 1 diff --git a/regression-test/data/query_p0/system/test_query_sys_scan_rowsets.out b/regression-test/data/query_p0/system/test_query_sys_scan_rowsets.out index b034ebe1bbdc69..94520db1e07560 100644 --- a/regression-test/data/query_p0/system/test_query_sys_scan_rowsets.out +++ b/regression-test/data/query_p0/system/test_query_sys_scan_rowsets.out @@ -13,6 +13,7 @@ DATA_DISK_SIZE bigint Yes false \N CREATION_TIME datetime Yes false \N NEWEST_WRITE_TIMESTAMP datetime Yes false \N SCHEMA_VERSION int Yes false \N +COMMIT_TSO bigint Yes false \N -- !rowsets1 -- diff --git a/regression-test/data/query_p0/system/test_table_properties.out b/regression-test/data/query_p0/system/test_table_properties.out index 497f300612039f..5f34cd069f2b3e 100644 --- a/regression-test/data/query_p0/system/test_table_properties.out +++ b/regression-test/data/query_p0/system/test_table_properties.out @@ -1,6 +1,6 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select_check_1 -- -114 +117 -- !select_check_2 -- internal test_table_properties_db duplicate_table _auto_bucket false @@ -16,6 +16,7 @@ internal test_table_properties_db duplicate_table deprecated_variant_enable_flat internal test_table_properties_db duplicate_table disable_auto_compaction false internal test_table_properties_db duplicate_table enable_mow_light_delete false internal test_table_properties_db duplicate_table enable_single_replica_compaction false +internal test_table_properties_db duplicate_table enable_tso false internal test_table_properties_db duplicate_table enable_unique_key_merge_on_write false internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 @@ -53,6 +54,7 @@ internal test_table_properties_db listtable deprecated_variant_enable_flatten_ne internal test_table_properties_db listtable disable_auto_compaction false internal test_table_properties_db listtable enable_mow_light_delete false internal test_table_properties_db listtable enable_single_replica_compaction false +internal test_table_properties_db listtable enable_tso false internal test_table_properties_db listtable enable_unique_key_merge_on_write false internal test_table_properties_db listtable file_cache_ttl_seconds 0 internal test_table_properties_db listtable group_commit_data_bytes 134217728 @@ -90,6 +92,7 @@ internal test_table_properties_db unique_table deprecated_variant_enable_flatten internal test_table_properties_db unique_table disable_auto_compaction false internal test_table_properties_db unique_table enable_mow_light_delete false internal test_table_properties_db unique_table enable_single_replica_compaction false +internal test_table_properties_db unique_table enable_tso false internal test_table_properties_db unique_table enable_unique_key_merge_on_write true internal test_table_properties_db unique_table file_cache_ttl_seconds 0 internal test_table_properties_db unique_table group_commit_data_bytes 134217728 @@ -129,6 +132,7 @@ internal test_table_properties_db duplicate_table deprecated_variant_enable_flat internal test_table_properties_db duplicate_table disable_auto_compaction false internal test_table_properties_db duplicate_table enable_mow_light_delete false internal test_table_properties_db duplicate_table enable_single_replica_compaction false +internal test_table_properties_db duplicate_table enable_tso false internal test_table_properties_db duplicate_table enable_unique_key_merge_on_write false internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 @@ -166,6 +170,7 @@ internal test_table_properties_db unique_table deprecated_variant_enable_flatten internal test_table_properties_db unique_table disable_auto_compaction false internal test_table_properties_db unique_table enable_mow_light_delete false internal test_table_properties_db unique_table enable_single_replica_compaction false +internal test_table_properties_db unique_table enable_tso false internal test_table_properties_db unique_table enable_unique_key_merge_on_write true internal test_table_properties_db unique_table file_cache_ttl_seconds 0 internal test_table_properties_db unique_table group_commit_data_bytes 134217728 @@ -207,6 +212,7 @@ internal test_table_properties_db duplicate_table deprecated_variant_enable_flat internal test_table_properties_db duplicate_table disable_auto_compaction false internal test_table_properties_db duplicate_table enable_mow_light_delete false internal test_table_properties_db duplicate_table enable_single_replica_compaction false +internal test_table_properties_db duplicate_table enable_tso false internal test_table_properties_db duplicate_table enable_unique_key_merge_on_write false internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 diff --git a/regression-test/suites/tso_p0/test_tso_api.groovy b/regression-test/suites/tso_p0/test_tso_api.groovy new file mode 100644 index 00000000000000..2bed26fdcf44bb --- /dev/null +++ b/regression-test/suites/tso_p0/test_tso_api.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.Http +import groovy.json.JsonSlurper + +suite("test_tso_api", "nonConcurrent") { + def ret = sql "SHOW FRONTEND CONFIG like '%experimental_enable_tso_feature%';" + logger.info("${ret}") + try { + sql "ADMIN SET FRONTEND CONFIG ('experimental_enable_tso_feature' = 'true')" + sleep(1000) + def currentTime = System.currentTimeMillis() + + // Test TSO API endpoint + def url = String.format("http://%s/api/tso", context.config.feHttpAddress) + + // Test 1: Basic TSO API access + def result = Http.GET(url, true) + logger.info("TSO API response: ${result}") + + assertTrue(result.code == 0) + assertEquals(result.msg, "success") + + // Check that all expected fields are present in the response + def data = result.data + assertTrue(data.containsKey("window_end_physical_time")) + assertTrue(data.containsKey("current_tso")) + assertTrue(data.containsKey("current_tso_physical_time")) + assertTrue(data.containsKey("current_tso_logical_counter")) + + // Validate that TSO values are reasonable + assertTrue(data.window_end_physical_time > 0) + assertTrue(data.current_tso > 0) + assertTrue(data.current_tso_physical_time > 0) + assertTrue(data.current_tso_logical_counter >= 0) + + // Test 2: Multiple TSO API calls should return consistent increasing values + def result1 = Http.GET(url, true) + Thread.sleep(10) // Small delay to ensure time progression + def result2 = Http.GET(url, true) + + assertTrue(result1.code == 0) + assertTrue(result2.code == 0) + + def tso1 = result1.data.current_tso + def tso2 = result2.data.current_tso + + // TSO should be monotonically increasing + assertTrue(tso2 >= tso1) + + // Test 3: Validate TSO timestamp structure + def physicalTime1 = result1.data.current_tso_physical_time + def logicalCounter1 = result1.data.current_tso_logical_counter + def physicalTime2 = result2.data.current_tso_physical_time + def logicalCounter2 = result2.data.current_tso_logical_counter + + // Physical time should be consistent with TSO + assertTrue(physicalTime1 <= tso1) + assertTrue(physicalTime2 <= tso2) + + // Test 4: Validate window end time is in the future + def windowEndTime = result1.data.window_end_physical_time + assertTrue(windowEndTime >= currentTime) + + // Test 5: Test unauthorized access (without credentials) + try { + def unauthorizedResult = Http.GET(url, false) // false means no auth + // Depending on server configuration, this might return 401 or still work + logger.info("Unauthorized access result: ${unauthorizedResult}") + } catch (Exception e) { + logger.info("Expected unauthorized access exception: ${e.getMessage()}") + } + + // Test 6: Validate TSO timestamp composition + def tsoValue = result1.data.current_tso + def physicalTime = result1.data.current_tso_physical_time + def logicalCounter = result1.data.current_tso_logical_counter + + // Validate that the TSO is composed correctly from physical time and logical counter + // TSO format: 46 bits physical time + 18 bits logical counter + def expectedTSO = (physicalTime << 18) | (logicalCounter & 0x3FFFFL) + // Note: We're not checking exact equality because of the reserved bits in the middle + + // At least verify that the physical time part matches + def extractedPhysicalTime = (tsoValue >> 18) & 0x3FFFFFFFFFFL // 46 bits mask + assertEquals(physicalTime, extractedPhysicalTime) + + // And that the logical counter part matches (lowest 18 bits) + def extractedLogicalCounter = tsoValue & 0x3FFFFL // 18 bits mask + assertEquals(logicalCounter, extractedLogicalCounter) + } finally { + sql "ADMIN SET FRONTEND CONFIG ('experimental_enable_tso_feature' = '${ret[0][1]}')" + } + + logger.info("TSO API test completed successfully") +} \ No newline at end of file diff --git a/regression-test/suites/tso_p0/test_tso_rowset_commit_tso.groovy b/regression-test/suites/tso_p0/test_tso_rowset_commit_tso.groovy new file mode 100644 index 00000000000000..eae3630b3f9227 --- /dev/null +++ b/regression-test/suites/tso_p0/test_tso_rowset_commit_tso.groovy @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.Http + +suite("test_tso_rowset_commit_tso", "nonConcurrent") { + def ret = sql "SHOW FRONTEND CONFIG like '%experimental_enable_tso_feature%';" + logger.info("${ret}") + try { + sql "ADMIN SET FRONTEND CONFIG ('experimental_enable_tso_feature' = 'true')" + sleep(1000) + def url = String.format("http://%s/api/tso", context.config.feHttpAddress) + def tsoResp = Http.GET(url, true) + if (tsoResp.code != 0) { + logger.info("tso api not available, skip test_tso_rowset_commit_tso") + return + } + + def tableName = "test_tso_rowset_commit_tso" + sql """DROP TABLE IF EXISTS ${tableName}""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id INT + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ("replication_num" = "1", "enable_tso" = "true", "disable_auto_compaction" = "true") + """ + + sql """INSERT INTO ${tableName} VALUES (1), (2), (3)""" + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + assertTrue(tablets.size() > 0) + def tabletId = tablets[0]["TabletId"] + + def commitTso = -1L + for (int i = 0; i < 10; i++) { + def rowsets = sql_return_maparray """ + select COMMIT_TSO from information_schema.rowsets + where TABLET_ID = ${tabletId} + order by TXN_ID desc limit 1 + """ + if (rowsets.size() > 0) { + commitTso = ((Number) rowsets[0]["COMMIT_TSO"]).longValue() + } + if (commitTso > 0) { + break + } + Thread.sleep(1000) + } + + assertTrue(commitTso > 0) + assertTrue(commitTso >= ((Number) tsoResp.data.current_tso).longValue()) + + sql """DROP TABLE IF EXISTS ${tableName}""" + } finally { + sql "ADMIN SET FRONTEND CONFIG ('experimental_enable_tso_feature' = '${ret[0][1]}')" + } +} +