diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 770ee9ca13a4f5..07bde01f8b9092 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -104,6 +104,8 @@ CONF_mInt32(instance_recycler_worker_pool_size, "32"); // Max number of delete tasks per batch when recycling objects. // Each task deletes up to 1000 files. Controls memory usage during large-scale deletion. CONF_Int32(recycler_max_tasks_per_batch, "1000"); +// Check object existence before deleting explicit files from object storage. +CONF_mBool(enable_delete_file_check_object_exists, "true"); // The worker pool size for http api `statistics_recycle` worker pool CONF_mInt32(instance_recycler_statistics_recycle_worker_pool_size, "5"); CONF_Bool(enable_checker, "false"); diff --git a/cloud/src/recycler/azure_obj_client.cpp b/cloud/src/recycler/azure_obj_client.cpp index 65c0a45240353e..afe02ebaa9ec16 100644 --- a/cloud/src/recycler/azure_obj_client.cpp +++ b/cloud/src/recycler/azure_obj_client.cpp @@ -240,6 +240,18 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket, return {0}; } + if (option.check_exists_before_delete) { + ObjectMeta obj_meta; + for (const auto& key : keys) { + auto head_resp = head_object({.bucket = bucket, .key = key}, &obj_meta); + DCHECK_EQ(head_resp.ret, ObjectStorageResponse::OK) + << "bucket=" << bucket << " key=" << key << " error=" << head_resp.error_msg; + if (head_resp.ret != ObjectStorageResponse::OK) { + return head_resp; + } + } + } + // TODO(ByteYue) : use range to adate this code when compiler is ready // auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations); auto begin = std::begin(keys); @@ -295,6 +307,21 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket, } ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) { + if (config::enable_delete_file_check_object_exists) { + ObjectMeta obj_meta; + auto head_resp = head_object(path, &obj_meta); + DCHECK_EQ(head_resp.ret, ObjectStorageResponse::OK) + << "bucket=" << path.bucket << " key=" << path.key + << " error=" << head_resp.error_msg; + if (head_resp.ret != ObjectStorageResponse::OK) { + return head_resp; + } + } + + return delete_object_impl(path); +} + +ObjectStorageResponse AzureObjClient::delete_object_impl(ObjectStoragePathRef path) { return do_azure_client_call( [&]() { if (auto r = s3_put_rate_limit([&]() { @@ -333,4 +360,4 @@ ObjectStorageResponse AzureObjClient::abort_multipart_upload(ObjectStoragePathRe return delete_object(path); } -} // namespace doris::cloud \ No newline at end of file +} // namespace doris::cloud diff --git a/cloud/src/recycler/azure_obj_client.h b/cloud/src/recycler/azure_obj_client.h index bdf9b4eda752cc..8660704db8651e 100644 --- a/cloud/src/recycler/azure_obj_client.h +++ b/cloud/src/recycler/azure_obj_client.h @@ -56,6 +56,8 @@ class AzureObjClient final : public ObjStorageClient { const std::string& upload_id) override; private: + ObjectStorageResponse delete_object_impl(ObjectStoragePathRef path); + std::shared_ptr client_; }; diff --git a/cloud/src/recycler/obj_storage_client.h b/cloud/src/recycler/obj_storage_client.h index 358dfba4943fd7..d46b0fbdbfa103 100644 --- a/cloud/src/recycler/obj_storage_client.h +++ b/cloud/src/recycler/obj_storage_client.h @@ -60,6 +60,7 @@ class ObjectListIterator { class SimpleThreadPool; struct ObjClientOptions { bool prefetch {true}; + bool check_exists_before_delete {false}; std::shared_ptr executor; }; diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 0f2a7776fcc7fe..792cd11481c947 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -479,7 +479,11 @@ int S3Accessor::delete_files(const std::vector& paths) { keys.emplace_back(get_key(path)); } - return obj_client_->delete_objects(conf_.bucket, std::move(keys), {.executor = worker_pool}) + return obj_client_ + ->delete_objects( + conf_.bucket, std::move(keys), + {.check_exists_before_delete = config::enable_delete_file_check_object_exists, + .executor = worker_pool}) .ret; } @@ -487,7 +491,7 @@ int S3Accessor::delete_file(const std::string& path) { LOG_INFO("delete file").tag("uri", to_uri(path)); int ret = obj_client_->delete_object({.bucket = conf_.bucket, .key = get_key(path)}).ret; static_assert(ObjectStorageResponse::OK == 0); - if (ret == ObjectStorageResponse::OK || ret == ObjectStorageResponse::NOT_FOUND) { + if (ret == ObjectStorageResponse::OK) { return 0; } return ret; diff --git a/cloud/src/recycler/s3_obj_client.cpp b/cloud/src/recycler/s3_obj_client.cpp index 81d791cabac465..bd4a9d5009b51c 100644 --- a/cloud/src/recycler/s3_obj_client.cpp +++ b/cloud/src/recycler/s3_obj_client.cpp @@ -259,13 +259,25 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, return {0}; } + if (option.check_exists_before_delete) { + ObjectMeta obj_meta; + for (const auto& key : keys) { + auto head_resp = head_object({.bucket = bucket, .key = key}, &obj_meta); + DCHECK_EQ(head_resp.ret, ObjectStorageResponse::OK) + << "bucket=" << bucket << " key=" << key << " error=" << head_resp.error_msg; + if (head_resp.ret != ObjectStorageResponse::OK) { + return head_resp; + } + } + } + Aws::S3::Model::DeleteObjectsRequest delete_request; delete_request.SetBucket(bucket); auto issue_delete = [&bucket, &delete_request, this](std::vector objects) -> int { if (objects.size() == 1) { - return delete_object({.bucket = bucket, .key = objects[0].GetKey()}).ret; + return delete_object_impl({.bucket = bucket, .key = objects[0].GetKey()}).ret; } Aws::S3::Model::Delete del; @@ -318,6 +330,21 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, } ObjectStorageResponse S3ObjClient::delete_object(ObjectStoragePathRef path) { + if (config::enable_delete_file_check_object_exists) { + ObjectMeta obj_meta; + auto head_resp = head_object(path, &obj_meta); + DCHECK_EQ(head_resp.ret, ObjectStorageResponse::OK) + << "bucket=" << path.bucket << " key=" << path.key + << " error=" << head_resp.error_msg; + if (head_resp.ret != ObjectStorageResponse::OK) { + return head_resp; + } + } + + return delete_object_impl(path); +} + +ObjectStorageResponse S3ObjClient::delete_object_impl(ObjectStoragePathRef path) { Aws::S3::Model::DeleteObjectRequest request; request.WithBucket(path.bucket).WithKey(path.key); auto outcome = s3_put_rate_limit([&]() { diff --git a/cloud/src/recycler/s3_obj_client.h b/cloud/src/recycler/s3_obj_client.h index e53564b6c9f708..bf9b90462dedc3 100644 --- a/cloud/src/recycler/s3_obj_client.h +++ b/cloud/src/recycler/s3_obj_client.h @@ -57,8 +57,10 @@ class S3ObjClient final : public ObjStorageClient { const std::string& upload_id) override; private: + ObjectStorageResponse delete_object_impl(ObjectStoragePathRef path); + std::shared_ptr s3_client_; std::string endpoint_; }; -} // namespace doris::cloud \ No newline at end of file +} // namespace doris::cloud diff --git a/cloud/test/s3_accessor_client_test.cpp b/cloud/test/s3_accessor_client_test.cpp index f6a8802cf22f46..66a58d86af9526 100644 --- a/cloud/test/s3_accessor_client_test.cpp +++ b/cloud/test/s3_accessor_client_test.cpp @@ -16,6 +16,9 @@ // under the License. #include +#include +#include +#include #include #include #include @@ -33,6 +36,7 @@ #include "cpp/sync_point.h" #include "recycler/recycler_service.h" #include "recycler/s3_accessor.h" +#include "recycler/s3_obj_client.h" using namespace doris; @@ -54,6 +58,72 @@ int main(int argc, char** argv) { namespace doris::cloud { +class MockDeleteS3Client : public Aws::S3::S3Client { +public: + Aws::S3::Model::HeadObjectOutcome HeadObject( + const Aws::S3::Model::HeadObjectRequest& request) const override { + ++head_count; + last_head_key = request.GetKey(); + if (exists) { + return Aws::S3::Model::HeadObjectOutcome(Aws::S3::Model::HeadObjectResult()); + } + + auto error = Aws::Client::AWSError(Aws::S3::S3Errors::RESOURCE_NOT_FOUND, + false); + error.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND); + return Aws::S3::Model::HeadObjectOutcome(error); + } + + Aws::S3::Model::DeleteObjectOutcome DeleteObject( + const Aws::S3::Model::DeleteObjectRequest& request) const override { + ++delete_count; + last_delete_key = request.GetKey(); + return Aws::S3::Model::DeleteObjectOutcome(Aws::S3::Model::DeleteObjectResult()); + } + + mutable int head_count {0}; + mutable int delete_count {0}; + mutable std::string last_head_key; + mutable std::string last_delete_key; + bool exists {true}; +}; + +class S3ObjClientTest : public testing::Test { +protected: + static void SetUpTestSuite() { Aws::InitAPI(options); } + + static void TearDownTestSuite() { Aws::ShutdownAPI(options); } + +private: + static Aws::SDKOptions options; +}; + +Aws::SDKOptions S3ObjClientTest::options {}; + +TEST_F(S3ObjClientTest, DeleteObjectCheckExistsConfigTest) { + auto mock_s3_client = std::make_shared(); + S3ObjClient obj_client(mock_s3_client, "dummy-endpoint"); + const bool original_check_exists = config::enable_delete_file_check_object_exists; + + config::enable_delete_file_check_object_exists = true; + auto response = obj_client.delete_object({.bucket = "dummy-bucket", .key = "existing-key"}); + EXPECT_EQ(response.ret, ObjectStorageResponse::OK); + EXPECT_EQ(mock_s3_client->head_count, 1); + EXPECT_EQ(mock_s3_client->delete_count, 1); + EXPECT_EQ(mock_s3_client->last_head_key, "existing-key"); + EXPECT_EQ(mock_s3_client->last_delete_key, "existing-key"); + + mock_s3_client->exists = false; + config::enable_delete_file_check_object_exists = false; + response = obj_client.delete_object({.bucket = "dummy-bucket", .key = "missing-key"}); + EXPECT_EQ(response.ret, ObjectStorageResponse::OK); + EXPECT_EQ(mock_s3_client->head_count, 1); + EXPECT_EQ(mock_s3_client->delete_count, 2); + EXPECT_EQ(mock_s3_client->last_delete_key, "missing-key"); + + config::enable_delete_file_check_object_exists = original_check_exists; +} + #define GET_ENV_IF_DEFINED(var) \ ([]() -> std::string { \ const char* val = std::getenv(#var); \