Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ CONF_mInt32(instance_recycler_worker_pool_size, "32");
// Max number of delete tasks per batch when recycling objects.
// Each task deletes up to 1000 files. Controls memory usage during large-scale deletion.
CONF_Int32(recycler_max_tasks_per_batch, "1000");
// Check object existence before deleting explicit files from object storage.
CONF_mBool(enable_delete_file_check_object_exists, "true");
// The worker pool size for http api `statistics_recycle` worker pool
CONF_mInt32(instance_recycler_statistics_recycle_worker_pool_size, "5");
CONF_Bool(enable_checker, "false");
Expand Down
29 changes: 28 additions & 1 deletion cloud/src/recycler/azure_obj_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
return {0};
}

if (option.check_exists_before_delete) {
ObjectMeta obj_meta;
for (const auto& key : keys) {
auto head_resp = head_object({.bucket = bucket, .key = key}, &obj_meta);
DCHECK_EQ(head_resp.ret, ObjectStorageResponse::OK)
<< "bucket=" << bucket << " key=" << key << " error=" << head_resp.error_msg;
if (head_resp.ret != ObjectStorageResponse::OK) {
return head_resp;
}
}
}

// TODO(ByteYue) : use range to adate this code when compiler is ready
// auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations);
auto begin = std::begin(keys);
Expand Down Expand Up @@ -295,6 +307,21 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
}

ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) {
if (config::enable_delete_file_check_object_exists) {
ObjectMeta obj_meta;
auto head_resp = head_object(path, &obj_meta);
DCHECK_EQ(head_resp.ret, ObjectStorageResponse::OK)
<< "bucket=" << path.bucket << " key=" << path.key
<< " error=" << head_resp.error_msg;
if (head_resp.ret != ObjectStorageResponse::OK) {
return head_resp;
}
}

return delete_object_impl(path);
}

ObjectStorageResponse AzureObjClient::delete_object_impl(ObjectStoragePathRef path) {
return do_azure_client_call(
[&]() {
if (auto r = s3_put_rate_limit([&]() {
Expand Down Expand Up @@ -333,4 +360,4 @@ ObjectStorageResponse AzureObjClient::abort_multipart_upload(ObjectStoragePathRe
return delete_object(path);
}

} // namespace doris::cloud
} // namespace doris::cloud
2 changes: 2 additions & 0 deletions cloud/src/recycler/azure_obj_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class AzureObjClient final : public ObjStorageClient {
const std::string& upload_id) override;

private:
ObjectStorageResponse delete_object_impl(ObjectStoragePathRef path);

std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client_;
};

Expand Down
1 change: 1 addition & 0 deletions cloud/src/recycler/obj_storage_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class ObjectListIterator {
class SimpleThreadPool;
struct ObjClientOptions {
bool prefetch {true};
bool check_exists_before_delete {false};
std::shared_ptr<SimpleThreadPool> executor;
};

Expand Down
8 changes: 6 additions & 2 deletions cloud/src/recycler/s3_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,15 +479,19 @@ int S3Accessor::delete_files(const std::vector<std::string>& paths) {
keys.emplace_back(get_key(path));
}

return obj_client_->delete_objects(conf_.bucket, std::move(keys), {.executor = worker_pool})
return obj_client_
->delete_objects(
conf_.bucket, std::move(keys),
{.check_exists_before_delete = config::enable_delete_file_check_object_exists,
.executor = worker_pool})
.ret;
}

int S3Accessor::delete_file(const std::string& path) {
LOG_INFO("delete file").tag("uri", to_uri(path));
int ret = obj_client_->delete_object({.bucket = conf_.bucket, .key = get_key(path)}).ret;
static_assert(ObjectStorageResponse::OK == 0);
if (ret == ObjectStorageResponse::OK || ret == ObjectStorageResponse::NOT_FOUND) {
if (ret == ObjectStorageResponse::OK) {
return 0;
}
return ret;
Expand Down
29 changes: 28 additions & 1 deletion cloud/src/recycler/s3_obj_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,25 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket,
return {0};
}

if (option.check_exists_before_delete) {
ObjectMeta obj_meta;
for (const auto& key : keys) {
auto head_resp = head_object({.bucket = bucket, .key = key}, &obj_meta);
DCHECK_EQ(head_resp.ret, ObjectStorageResponse::OK)
<< "bucket=" << bucket << " key=" << key << " error=" << head_resp.error_msg;
if (head_resp.ret != ObjectStorageResponse::OK) {
return head_resp;
}
}
}

Aws::S3::Model::DeleteObjectsRequest delete_request;
delete_request.SetBucket(bucket);

auto issue_delete = [&bucket, &delete_request,
this](std::vector<Aws::S3::Model::ObjectIdentifier> objects) -> int {
if (objects.size() == 1) {
return delete_object({.bucket = bucket, .key = objects[0].GetKey()}).ret;
return delete_object_impl({.bucket = bucket, .key = objects[0].GetKey()}).ret;
}

Aws::S3::Model::Delete del;
Expand Down Expand Up @@ -318,6 +330,21 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket,
}

ObjectStorageResponse S3ObjClient::delete_object(ObjectStoragePathRef path) {
if (config::enable_delete_file_check_object_exists) {
ObjectMeta obj_meta;
auto head_resp = head_object(path, &obj_meta);
DCHECK_EQ(head_resp.ret, ObjectStorageResponse::OK)
<< "bucket=" << path.bucket << " key=" << path.key
<< " error=" << head_resp.error_msg;
if (head_resp.ret != ObjectStorageResponse::OK) {
return head_resp;
}
}

return delete_object_impl(path);
}

ObjectStorageResponse S3ObjClient::delete_object_impl(ObjectStoragePathRef path) {
Aws::S3::Model::DeleteObjectRequest request;
request.WithBucket(path.bucket).WithKey(path.key);
auto outcome = s3_put_rate_limit([&]() {
Expand Down
4 changes: 3 additions & 1 deletion cloud/src/recycler/s3_obj_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ class S3ObjClient final : public ObjStorageClient {
const std::string& upload_id) override;

private:
ObjectStorageResponse delete_object_impl(ObjectStoragePathRef path);

std::shared_ptr<Aws::S3::S3Client> s3_client_;
std::string endpoint_;
};

} // namespace doris::cloud
} // namespace doris::cloud
70 changes: 70 additions & 0 deletions cloud/test/s3_accessor_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// under the License.

#include <aws/s3/S3Client.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/HeadObjectResult.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <butil/guid.h>
#include <cpp/s3_rate_limiter.h>
Expand All @@ -33,6 +36,7 @@
#include "cpp/sync_point.h"
#include "recycler/recycler_service.h"
#include "recycler/s3_accessor.h"
#include "recycler/s3_obj_client.h"

using namespace doris;

Expand All @@ -54,6 +58,72 @@ int main(int argc, char** argv) {

namespace doris::cloud {

class MockDeleteS3Client : public Aws::S3::S3Client {
public:
Aws::S3::Model::HeadObjectOutcome HeadObject(
const Aws::S3::Model::HeadObjectRequest& request) const override {
++head_count;
last_head_key = request.GetKey();
if (exists) {
return Aws::S3::Model::HeadObjectOutcome(Aws::S3::Model::HeadObjectResult());
}

auto error = Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
false);
error.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND);
return Aws::S3::Model::HeadObjectOutcome(error);
}

Aws::S3::Model::DeleteObjectOutcome DeleteObject(
const Aws::S3::Model::DeleteObjectRequest& request) const override {
++delete_count;
last_delete_key = request.GetKey();
return Aws::S3::Model::DeleteObjectOutcome(Aws::S3::Model::DeleteObjectResult());
}

mutable int head_count {0};
mutable int delete_count {0};
mutable std::string last_head_key;
mutable std::string last_delete_key;
bool exists {true};
};

class S3ObjClientTest : public testing::Test {
protected:
static void SetUpTestSuite() { Aws::InitAPI(options); }

static void TearDownTestSuite() { Aws::ShutdownAPI(options); }

private:
static Aws::SDKOptions options;
};

Aws::SDKOptions S3ObjClientTest::options {};

TEST_F(S3ObjClientTest, DeleteObjectCheckExistsConfigTest) {
auto mock_s3_client = std::make_shared<MockDeleteS3Client>();
S3ObjClient obj_client(mock_s3_client, "dummy-endpoint");
const bool original_check_exists = config::enable_delete_file_check_object_exists;

config::enable_delete_file_check_object_exists = true;
auto response = obj_client.delete_object({.bucket = "dummy-bucket", .key = "existing-key"});
EXPECT_EQ(response.ret, ObjectStorageResponse::OK);
EXPECT_EQ(mock_s3_client->head_count, 1);
EXPECT_EQ(mock_s3_client->delete_count, 1);
EXPECT_EQ(mock_s3_client->last_head_key, "existing-key");
EXPECT_EQ(mock_s3_client->last_delete_key, "existing-key");

mock_s3_client->exists = false;
config::enable_delete_file_check_object_exists = false;
response = obj_client.delete_object({.bucket = "dummy-bucket", .key = "missing-key"});
EXPECT_EQ(response.ret, ObjectStorageResponse::OK);
EXPECT_EQ(mock_s3_client->head_count, 1);
EXPECT_EQ(mock_s3_client->delete_count, 2);
EXPECT_EQ(mock_s3_client->last_delete_key, "missing-key");

config::enable_delete_file_check_object_exists = original_check_exists;
}

#define GET_ENV_IF_DEFINED(var) \
([]() -> std::string { \
const char* val = std::getenv(#var); \
Expand Down
Loading