Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2659,22 +2659,22 @@ int check_idempotent_for_txn_or_job(Transaction* txn, const std::string& recycle
return -1;
}
} else if (!config::enable_recycle_delete_rowset_key_check) {
if (config::enable_tablet_job_check && tablet_job_id.empty() && !tablet_job_id.empty()) {
if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
if (!check_job_existed(txn, code, msg, instance_id, tablet_id, rowset_id, tablet_job_id,
is_versioned_read, resource_mgr)) {
return 1;
}
}

// Check if the prepare rowset request is invalid.
// If the transaction has been finished, it means this prepare rowset is a timeout retry request.
// Check if the commit rowset request is invalid.
// If the transaction has been finished, it means this commit rowset is a timeout retry request.
// In this case, do not write the recycle key again, otherwise it may cause data loss.
// If the rowset had load id, it means it is a load request, otherwise it is a
// compaction/sc request.
if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
!check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn, instance_id,
rowset_meta.txn_id(), code, msg)) {
LOG(WARNING) << "prepare rowset failed, txn_id=" << rowset_meta.txn_id()
LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id()
<< ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id
<< ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg;
return 1;
Expand Down
143 changes: 143 additions & 0 deletions cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ TEST(MetaServiceJobVersionedReadTest, CompactionJobTest) {
auto tmp_rowset = create_rowset(tablet_id, tc.start_version, tc.end_version, 100);
tmp_rowset.set_txn_id(txn_id);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res, txn_id);
commit_rowset(meta_service.get(), tmp_rowset, res, txn_id);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -1486,6 +1487,7 @@ TEST(MetaServiceJobVersionedReadTest, SchemaChangeJobTest) {
output_rowsets.push_back(rowset);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), output_rowsets.back(), res, txn_id + i);
commit_rowset(meta_service.get(), output_rowsets.back(), res, txn_id + i);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

Expand Down Expand Up @@ -6768,4 +6770,145 @@ TEST(MetaServiceJobTest, DeleteJobForRelatedRowsetTest) {
}
}

// Test: Verify that check_idempotent_for_txn_or_job correctly calls check_job_existed
// when enable_recycle_delete_rowset_key_check is false and tablet_job_id is non-empty.
// This covers the bug fix where the condition was:
// tablet_job_id.empty() && !tablet_job_id.empty() (always false, check never ran)
// Fixed to:
// !tablet_job_id.empty()
TEST(MetaServiceJobTest, CheckIdempotentWithTabletJobId) {
auto meta_service = get_meta_service();
auto* sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
config::enable_recycle_delete_rowset_key_check = true;
};
// Disable recycle_delete_rowset_key_check so we enter the else-if branch
// in check_idempotent_for_txn_or_job where the bug existed.
config::enable_recycle_delete_rowset_key_check = false;
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();

int64_t table_id = 13440;
int64_t index_id = 13450;
int64_t partition_id = 13460;
int64_t tablet_id = 13470;

create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id);

std::string job_id = "test_check_idempotent_job";

// Step 1: Create input rowsets for compaction
{
std::vector<doris::RowsetMetaCloudPB> input_rowsets;
input_rowsets.push_back(create_rowset(tablet_id, 2, 2, 100));
input_rowsets.push_back(create_rowset(tablet_id, 3, 3, 100));
input_rowsets[0].set_resource_id(std::string(RESOURCE_ID));
input_rowsets[1].set_resource_id(std::string(RESOURCE_ID));
insert_rowsets(meta_service->txn_kv().get(), table_id, index_id, partition_id, tablet_id,
input_rowsets);
}

// Step 2: Start a compaction job
{
StartTabletJobResponse res;
start_compaction_job(meta_service.get(), tablet_id, job_id, "test_initiator", 0, 0,
TabletCompactionJobPB::CUMULATIVE, res, {2, 3});
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

// Step 3: Prepare rowset with tablet_job_id
doris::RowsetMetaCloudPB rowset_meta;
{
rowset_meta = create_rowset(tablet_id, 4, 4, 200);
rowset_meta.set_job_id(job_id);
rowset_meta.set_resource_id(std::string(RESOURCE_ID));

brpc::Controller cntl;
CreateRowsetResponse res;
auto* arena = res.GetArena();
auto* req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset_meta);
req->set_tablet_job_id(job_id);
meta_service->prepare_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

// Step 4: commit_rowset with tablet_job_id while job still exists - should succeed
{
brpc::Controller cntl;
CreateRowsetResponse res;
auto* arena = res.GetArena();
auto* req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset_meta);
req->set_tablet_job_id(job_id);
meta_service->commit_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
ASSERT_EQ(res.status().code(), MetaServiceCode::OK)
<< "commit_rowset should succeed when job exists, msg=" << res.status().msg();
}

// Step 5: Abort the compaction job (removes the job entry from TabletJobInfoPB)
{
FinishTabletJobResponse res;
finish_compaction_job(meta_service.get(), tablet_id, job_id, "test_initiator", 0, 0,
TabletCompactionJobPB::CUMULATIVE, res, FinishTabletJobRequest::ABORT,
{2, 3});
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

// Step 6: Prepare a new rowset (without tablet_job_id to bypass prepare check)
doris::RowsetMetaCloudPB rowset_meta2;
{
rowset_meta2 = create_rowset(tablet_id, 5, 5, 200);
rowset_meta2.set_job_id(job_id);
rowset_meta2.set_resource_id(std::string(RESOURCE_ID));

brpc::Controller cntl;
CreateRowsetResponse res;
auto* arena = res.GetArena();
auto* req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
meta_service->prepare_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

// Step 7: commit_rowset with tablet_job_id after job aborted - should fail.
// Before the fix, this would incorrectly succeed because check_job_existed was
// never called (the condition was always false).
{
brpc::Controller cntl;
CreateRowsetResponse res;
auto* arena = res.GetArena();
auto* req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
req->set_tablet_job_id(job_id);
meta_service->commit_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET)
<< "commit_rowset should fail with STALE_PREPARE_ROWSET when job is aborted, msg="
<< res.status().msg();
}

// Step 8: commit_rowset without tablet_job_id - should succeed (skips job check)
{
brpc::Controller cntl;
CreateRowsetResponse res;
auto* arena = res.GetArena();
auto* req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
// Do NOT set tablet_job_id - the check should be skipped
meta_service->commit_rowset(&cntl, req, &res, nullptr);
if (!arena) delete req;
ASSERT_EQ(res.status().code(), MetaServiceCode::OK)
<< "commit_rowset without tablet_job_id should succeed, msg=" << res.status().msg();
}
}

} // namespace doris::cloud
6 changes: 5 additions & 1 deletion cloud/test/meta_service_operation_log_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ extern void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int6
int64_t partition_id, int64_t tablet_id);
extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id,
int64_t version, int num_rows);
extern void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res);
extern void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res);
extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id,
Expand Down Expand Up @@ -841,7 +843,7 @@ TEST(MetaServiceOperationLogTest, CommitTxn) {
LOG(INFO) << "Creating rowset for tablet_id=" << (tablet_id_base + i)
<< ", partition_id=" << partition_id << ", txn_id=" << txn_id
<< ", rowset=" << tmp_rowset.ShortDebugString();

prepare_rowset(meta_service.get(), tmp_rowset, res);
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down Expand Up @@ -1048,6 +1050,7 @@ TEST(MetaServiceOperationLogTest, CommitTxnEventually) {
create_tablet(meta_service.get(), table_id, 1237, partition_id, tablet_id_base);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base, partition_id, 1, 100);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);

Expand Down Expand Up @@ -1273,6 +1276,7 @@ TEST(MetaServiceOperationLogTest, CommitTxnWithSubTxn) {
create_tablet(meta_service.get(), table_id, 1238, partition_id, tablet_id_base + i);
auto tmp_rowset = create_rowset(sub_txn_id, tablet_id_base + i, partition_id, 1, 100);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down
4 changes: 2 additions & 2 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int pa
return rowset;
}

static void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
auto arena = res.GetArena();
auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
Expand Down
4 changes: 4 additions & 0 deletions cloud/test/meta_service_versioned_read_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "meta-store/versioned_value.h"
#include "mock_resource_manager.h"
#include "rate-limiter/rate_limiter.h"
#include "recycler/util.h"
#include "resource-manager/resource_manager.h"

namespace doris::cloud {
Expand All @@ -57,6 +58,8 @@ extern void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int6
int64_t partition_id, int64_t tablet_id);
extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id,
int64_t version, int num_rows);
extern void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res);
extern void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res);
extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label,
Expand Down Expand Up @@ -205,6 +208,7 @@ TEST(MetaServiceVersionedReadTest, CommitTxn) {
create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id, -1, 100);
CreateRowsetResponse res;
prepare_rowset(meta_service.get(), tmp_rowset, res);
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
Expand Down
27 changes: 24 additions & 3 deletions cloud/test/schema_kv_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,13 +651,15 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
config::enable_recycle_delete_rowset_key_check = true;
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();
config::enable_recycle_delete_rowset_key_check = false;

int64_t db_id = 1000;

Expand All @@ -667,17 +669,36 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, table_id, index_id,
partition_id, tablet_id, next_rowset_id(), 1));

int64_t txn_id = -1;
std::string label = "test_abort_txn_label";
{
brpc::Controller cntl;
BeginTxnRequest req;
BeginTxnResponse res;
req.set_cloud_unique_id("test_cloud_unique_id");
auto* txn_info = req.mutable_txn_info();
txn_info->set_db_id(db_id);
txn_info->set_label(label);
txn_info->add_table_ids(table_id);
txn_info->set_timeout_ms(36000);
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
txn_id = res.txn_id();
LOG(INFO) << "Step 1: Transaction started, txn_id=" << txn_id;
}
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
auto committed_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 2);
auto committed_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 2, 2);
std::string tmp_rowset_key, tmp_rowset_val;
// 0:instance_id 1:txn_id 2:tablet_id
meta_rowset_tmp_key({instance_id, 10005, tablet_id}, &tmp_rowset_key);
meta_rowset_tmp_key({instance_id, txn_id, tablet_id}, &tmp_rowset_key);
ASSERT_TRUE(committed_rowset.SerializeToString(&tmp_rowset_val));
txn->put(tmp_rowset_key, tmp_rowset_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
CreateRowsetResponse res;
auto new_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 2);
auto new_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 2, 2);
prepare_rowset(meta_service.get(), new_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res.has_existed_rowset_meta());
Expand Down
Loading