From 50ab9d5b04e868806c12a5e263cf9fa581775748 Mon Sep 17 00:00:00 2001 From: wyxxxcat Date: Tue, 17 Mar 2026 15:27:05 +0800 Subject: [PATCH] 1 --- cloud/src/meta-service/meta_service.cpp | 8 +- cloud/test/meta_service_job_test.cpp | 143 ++++++++++++++++++ .../test/meta_service_operation_log_test.cpp | 6 +- cloud/test/meta_service_test.cpp | 4 +- .../test/meta_service_versioned_read_test.cpp | 4 + cloud/test/schema_kv_test.cpp | 27 +++- 6 files changed, 182 insertions(+), 10 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 23927ba99b3c99..903d77f8092ced 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -2659,22 +2659,22 @@ int check_idempotent_for_txn_or_job(Transaction* txn, const std::string& recycle return -1; } } else if (!config::enable_recycle_delete_rowset_key_check) { - if (config::enable_tablet_job_check && tablet_job_id.empty() && !tablet_job_id.empty()) { + if (config::enable_tablet_job_check && !tablet_job_id.empty()) { if (!check_job_existed(txn, code, msg, instance_id, tablet_id, rowset_id, tablet_job_id, is_versioned_read, resource_mgr)) { return 1; } } - // Check if the prepare rowset request is invalid. - // If the transaction has been finished, it means this prepare rowset is a timeout retry request. + // Check if the commit rowset request is invalid. + // If the transaction has been finished, it means this commit rowset is a timeout retry request. // In this case, do not write the recycle key again, otherwise it may cause data loss. // If the rowset had load id, it means it is a load request, otherwise it is a // compaction/sc request. if (config::enable_load_txn_status_check && rowset_meta.has_load_id() && !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn, instance_id, rowset_meta.txn_id(), code, msg)) { - LOG(WARNING) << "prepare rowset failed, txn_id=" << rowset_meta.txn_id() + LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id() << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id << ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg; return 1; diff --git a/cloud/test/meta_service_job_test.cpp b/cloud/test/meta_service_job_test.cpp index 65527e6a86973f..4ecfaea9d44360 100644 --- a/cloud/test/meta_service_job_test.cpp +++ b/cloud/test/meta_service_job_test.cpp @@ -1306,6 +1306,7 @@ TEST(MetaServiceJobVersionedReadTest, CompactionJobTest) { auto tmp_rowset = create_rowset(tablet_id, tc.start_version, tc.end_version, 100); tmp_rowset.set_txn_id(txn_id); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res, txn_id); commit_rowset(meta_service.get(), tmp_rowset, res, txn_id); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1486,6 +1487,7 @@ TEST(MetaServiceJobVersionedReadTest, SchemaChangeJobTest) { output_rowsets.push_back(rowset); CreateRowsetResponse res; commit_rowset(meta_service.get(), output_rowsets.back(), res, txn_id + i); + commit_rowset(meta_service.get(), output_rowsets.back(), res, txn_id + i); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -6768,4 +6770,145 @@ TEST(MetaServiceJobTest, DeleteJobForRelatedRowsetTest) { } } +// Test: Verify that check_idempotent_for_txn_or_job correctly calls check_job_existed +// when enable_recycle_delete_rowset_key_check is false and tablet_job_id is non-empty. +// This covers the bug fix where the condition was: +// tablet_job_id.empty() && !tablet_job_id.empty() (always false, check never ran) +// Fixed to: +// !tablet_job_id.empty() +TEST(MetaServiceJobTest, CheckIdempotentWithTabletJobId) { + auto meta_service = get_meta_service(); + auto* sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + config::enable_recycle_delete_rowset_key_check = true; + }; + // Disable recycle_delete_rowset_key_check so we enter the else-if branch + // in check_idempotent_for_txn_or_job where the bug existed. + config::enable_recycle_delete_rowset_key_check = false; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + int64_t table_id = 13440; + int64_t index_id = 13450; + int64_t partition_id = 13460; + int64_t tablet_id = 13470; + + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); + + std::string job_id = "test_check_idempotent_job"; + + // Step 1: Create input rowsets for compaction + { + std::vector input_rowsets; + input_rowsets.push_back(create_rowset(tablet_id, 2, 2, 100)); + input_rowsets.push_back(create_rowset(tablet_id, 3, 3, 100)); + input_rowsets[0].set_resource_id(std::string(RESOURCE_ID)); + input_rowsets[1].set_resource_id(std::string(RESOURCE_ID)); + insert_rowsets(meta_service->txn_kv().get(), table_id, index_id, partition_id, tablet_id, + input_rowsets); + } + + // Step 2: Start a compaction job + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, job_id, "test_initiator", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res, {2, 3}); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // Step 3: Prepare rowset with tablet_job_id + doris::RowsetMetaCloudPB rowset_meta; + { + rowset_meta = create_rowset(tablet_id, 4, 4, 200); + rowset_meta.set_job_id(job_id); + rowset_meta.set_resource_id(std::string(RESOURCE_ID)); + + brpc::Controller cntl; + CreateRowsetResponse res; + auto* arena = res.GetArena(); + auto* req = google::protobuf::Arena::CreateMessage(arena); + req->mutable_rowset_meta()->CopyFrom(rowset_meta); + req->set_tablet_job_id(job_id); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // Step 4: commit_rowset with tablet_job_id while job still exists - should succeed + { + brpc::Controller cntl; + CreateRowsetResponse res; + auto* arena = res.GetArena(); + auto* req = google::protobuf::Arena::CreateMessage(arena); + req->mutable_rowset_meta()->CopyFrom(rowset_meta); + req->set_tablet_job_id(job_id); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) + << "commit_rowset should succeed when job exists, msg=" << res.status().msg(); + } + + // Step 5: Abort the compaction job (removes the job entry from TabletJobInfoPB) + { + FinishTabletJobResponse res; + finish_compaction_job(meta_service.get(), tablet_id, job_id, "test_initiator", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res, FinishTabletJobRequest::ABORT, + {2, 3}); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // Step 6: Prepare a new rowset (without tablet_job_id to bypass prepare check) + doris::RowsetMetaCloudPB rowset_meta2; + { + rowset_meta2 = create_rowset(tablet_id, 5, 5, 200); + rowset_meta2.set_job_id(job_id); + rowset_meta2.set_resource_id(std::string(RESOURCE_ID)); + + brpc::Controller cntl; + CreateRowsetResponse res; + auto* arena = res.GetArena(); + auto* req = google::protobuf::Arena::CreateMessage(arena); + req->mutable_rowset_meta()->CopyFrom(rowset_meta2); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // Step 7: commit_rowset with tablet_job_id after job aborted - should fail. + // Before the fix, this would incorrectly succeed because check_job_existed was + // never called (the condition was always false). + { + brpc::Controller cntl; + CreateRowsetResponse res; + auto* arena = res.GetArena(); + auto* req = google::protobuf::Arena::CreateMessage(arena); + req->mutable_rowset_meta()->CopyFrom(rowset_meta2); + req->set_tablet_job_id(job_id); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) + << "commit_rowset should fail with STALE_PREPARE_ROWSET when job is aborted, msg=" + << res.status().msg(); + } + + // Step 8: commit_rowset without tablet_job_id - should succeed (skips job check) + { + brpc::Controller cntl; + CreateRowsetResponse res; + auto* arena = res.GetArena(); + auto* req = google::protobuf::Arena::CreateMessage(arena); + req->mutable_rowset_meta()->CopyFrom(rowset_meta2); + // Do NOT set tablet_job_id - the check should be skipped + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) + << "commit_rowset without tablet_job_id should succeed, msg=" << res.status().msg(); + } +} + } // namespace doris::cloud diff --git a/cloud/test/meta_service_operation_log_test.cpp b/cloud/test/meta_service_operation_log_test.cpp index 5a5c46a7603c23..ee687cf532ef21 100644 --- a/cloud/test/meta_service_operation_log_test.cpp +++ b/cloud/test/meta_service_operation_log_test.cpp @@ -47,6 +47,8 @@ extern void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int6 int64_t partition_id, int64_t tablet_id); extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id, int64_t version, int num_rows); +extern void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, + CreateRowsetResponse& res); extern void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, CreateRowsetResponse& res); extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id, @@ -841,7 +843,7 @@ TEST(MetaServiceOperationLogTest, CommitTxn) { LOG(INFO) << "Creating rowset for tablet_id=" << (tablet_id_base + i) << ", partition_id=" << partition_id << ", txn_id=" << txn_id << ", rowset=" << tmp_rowset.ShortDebugString(); - + prepare_rowset(meta_service.get(), tmp_rowset, res); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } @@ -1048,6 +1050,7 @@ TEST(MetaServiceOperationLogTest, CommitTxnEventually) { create_tablet(meta_service.get(), table_id, 1237, partition_id, tablet_id_base); auto tmp_rowset = create_rowset(txn_id, tablet_id_base, partition_id, 1, 100); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); @@ -1273,6 +1276,7 @@ TEST(MetaServiceOperationLogTest, CommitTxnWithSubTxn) { create_tablet(meta_service.get(), table_id, 1238, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(sub_txn_id, tablet_id_base + i, partition_id, 1, 100); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 2b057a0b25830b..0846cc9e105e18 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -253,8 +253,8 @@ doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int pa return rowset; } -static void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, - CreateRowsetResponse& res) { +void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, + CreateRowsetResponse& res) { brpc::Controller cntl; auto arena = res.GetArena(); auto req = google::protobuf::Arena::CreateMessage(arena); diff --git a/cloud/test/meta_service_versioned_read_test.cpp b/cloud/test/meta_service_versioned_read_test.cpp index 587930117467a2..7ba77a1d3aa9ec 100644 --- a/cloud/test/meta_service_versioned_read_test.cpp +++ b/cloud/test/meta_service_versioned_read_test.cpp @@ -46,6 +46,7 @@ #include "meta-store/versioned_value.h" #include "mock_resource_manager.h" #include "rate-limiter/rate_limiter.h" +#include "recycler/util.h" #include "resource-manager/resource_manager.h" namespace doris::cloud { @@ -57,6 +58,8 @@ extern void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int6 int64_t partition_id, int64_t tablet_id); extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id, int64_t version, int num_rows); +extern void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, + CreateRowsetResponse& res); extern void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, CreateRowsetResponse& res); extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label, @@ -205,6 +208,7 @@ TEST(MetaServiceVersionedReadTest, CommitTxn) { create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id, -1, 100); CreateRowsetResponse res; + prepare_rowset(meta_service.get(), tmp_rowset, res); commit_rowset(meta_service.get(), tmp_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } diff --git a/cloud/test/schema_kv_test.cpp b/cloud/test/schema_kv_test.cpp index 54733a29a65c94..32a9302a0150e4 100644 --- a/cloud/test/schema_kv_test.cpp +++ b/cloud/test/schema_kv_test.cpp @@ -651,6 +651,7 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) { auto sp = SyncPoint::get_instance(); DORIS_CLOUD_DEFER { SyncPoint::get_instance()->clear_all_call_backs(); + config::enable_recycle_delete_rowset_key_check = true; }; sp->set_call_back("get_instance_id", [&](auto&& args) { auto* ret = try_any_cast_ret(args); @@ -658,6 +659,7 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) { ret->second = true; }); sp->enable_processing(); + config::enable_recycle_delete_rowset_key_check = false; int64_t db_id = 1000; @@ -667,17 +669,36 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) { config::write_schema_kv = false; ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id, next_rowset_id(), 1)); + + int64_t txn_id = -1; + std::string label = "test_abort_txn_label"; + { + brpc::Controller cntl; + BeginTxnRequest req; + BeginTxnResponse res; + req.set_cloud_unique_id("test_cloud_unique_id"); + auto* txn_info = req.mutable_txn_info(); + txn_info->set_db_id(db_id); + txn_info->set_label(label); + txn_info->add_table_ids(table_id); + txn_info->set_timeout_ms(36000); + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + txn_id = res.txn_id(); + LOG(INFO) << "Step 1: Transaction started, txn_id=" << txn_id; + } std::unique_ptr txn; ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); - auto committed_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 2); + auto committed_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 2, 2); std::string tmp_rowset_key, tmp_rowset_val; // 0:instance_id 1:txn_id 2:tablet_id - meta_rowset_tmp_key({instance_id, 10005, tablet_id}, &tmp_rowset_key); + meta_rowset_tmp_key({instance_id, txn_id, tablet_id}, &tmp_rowset_key); ASSERT_TRUE(committed_rowset.SerializeToString(&tmp_rowset_val)); txn->put(tmp_rowset_key, tmp_rowset_val); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); CreateRowsetResponse res; - auto new_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 2); + auto new_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 2, 2); prepare_rowset(meta_service.get(), new_rowset, res); ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED); ASSERT_TRUE(res.has_existed_rowset_meta());