From 106f7a8c95ec75b4b2626e5d56c55a4b88d6dd3a Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Mon, 29 Jun 2026 10:42:53 +0000 Subject: [PATCH 1/8] revert: revert previous appendable upload checksum validation implementation --- .../connection_impl_appendable_upload_test.cc | 256 ------------------ .../internal/async/writer_connection_impl.cc | 13 +- .../async/writer_connection_resumed.cc | 9 +- 3 files changed, 5 insertions(+), 273 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index ef541d8f7b972..cb6244acf2a89 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -631,262 +631,6 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) { next.first.set_value(true); } -TEST_F(AsyncConnectionImplAppendableTest, - StartAppendableObjectUploadWithChecksum) { - auto constexpr kRequestText = R"pb( - write_object_spec { - resource { - bucket: "projects/_/buckets/test-bucket" - name: "test-object" - content_type: "text/plain" - } - } - )pb"; - AsyncSequencer sequencer; - auto mock = std::make_shared(); - - google::storage::v2::Object initial_resource; - initial_resource.set_bucket("projects/_/buckets/test-bucket"); - initial_resource.set_name("test-object"); - initial_resource.set_size(1024); - initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC - - auto stream = std::make_unique(); - EXPECT_CALL(*stream, Start).WillOnce([&] { - return sequencer.PushBack("Start"); - }); - - EXPECT_CALL(*stream, Read) - .WillOnce([&, initial_resource] { - return sequencer.PushBack("Read(Takeover)") - .then([initial_resource](auto) { - auto response = google::storage::v2::BidiWriteObjectResponse{}; - *response.mutable_resource() = initial_resource; - return absl::make_optional(std::move(response)); - }); - }) - .WillOnce([&, initial_resource] { - return sequencer.PushBack("Read(FinalObject)") - .then([initial_resource](auto) { - auto response = google::storage::v2::BidiWriteObjectResponse{}; - *response.mutable_resource() = initial_resource; - response.mutable_resource()->set_size( - initial_resource.size() + 9); // "some data" size is 9 - return absl::make_optional(std::move(response)); - }); - }); - - EXPECT_CALL(*stream, Cancel).Times(1); - EXPECT_CALL(*stream, Finish).WillOnce([&] { - return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); - }); - - EXPECT_CALL(*stream, Write) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.state_lookup()); - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(StateLookup)"); - }) - .WillOnce( - [&](google::storage::v2::BidiWriteObjectRequest const& /*request*/, - grpc::WriteOptions wopt) { - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(data)"); - }) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.finish_write()); - EXPECT_TRUE(wopt.is_last_message()); - // Here we expect full checksums to be set because we had the resource - // in takeover. - EXPECT_TRUE(request.has_object_checksums()); - auto expected_crc = - google::cloud::storage_internal::ExtendCrc32c(12345, "some data"); - EXPECT_EQ(request.object_checksums().crc32c(), expected_crc); - return sequencer.PushBack("Write(Finalize)"); - }); - - EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] { - return std::unique_ptr(std::move(stream)); - }); - - internal::AutomaticallyCreatedBackgroundThreads pool(1); - // Enable CRC32C validation in options - auto options = TestOptions().set(true); - auto connection = MakeTestConnection(pool.cq(), mock, options); - - auto request = google::storage::v2::BidiWriteObjectRequest{}; - ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); - request.mutable_write_object_spec()->set_appendable(true); - - auto pending = connection->StartAppendableObjectUpload( - {std::move(request), connection->options()}); - - auto next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Start"); - next.first.set_value(true); - - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(StateLookup)"); - next.first.set_value(true); - - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read(Takeover)"); - next.first.set_value(true); - - auto r = pending.get(); - ASSERT_STATUS_OK(r); - auto writer = *std::move(r); - - // Write some data. - auto w1 = writer->Write(storage::WritePayload("some data")); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(data)"); - next.first.set_value(true); - EXPECT_STATUS_OK(w1.get()); - - // Finalize the upload. - auto w2 = writer->Finalize({}); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(Finalize)"); - next.first.set_value(true); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read(FinalObject)"); - next.first.set_value(true); - - auto response = w2.get(); - ASSERT_STATUS_OK(response); - - writer.reset(); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Finish"); - next.first.set_value(true); -} - -// TODO(#16174): Figure out why this test fails to compile in MSVC. -#ifndef _WIN32 - -TEST_F(AsyncConnectionImplAppendableTest, - ResumeAppendableObjectUploadWithChecksum) { - auto constexpr kRequestText = R"pb( - append_object_spec { object: "test-object" } - )pb"; - AsyncSequencer sequencer; - auto mock = std::make_shared(); - - constexpr std::int64_t kPersistedSize = 16384; - constexpr std::uint32_t kPersistedCrc = 12345; - - auto stream = std::make_unique(); - EXPECT_CALL(*stream, Start).WillOnce([&] { - return sequencer.PushBack("Start"); - }); - - EXPECT_CALL(*stream, Read) - .WillOnce([&] { - return sequencer.PushBack("Read(PersistedSize)").then([](auto) { - auto response = google::storage::v2::BidiWriteObjectResponse{}; - response.set_persisted_size(kPersistedSize); - response.mutable_persisted_data_checksums()->set_crc32c( - kPersistedCrc); - return absl::make_optional(std::move(response)); - }); - }) - .WillOnce([&] { - return sequencer.PushBack("Read(FinalObject)").then([](auto) { - auto response = google::storage::v2::BidiWriteObjectResponse{}; - auto object = google::storage::v2::Object{}; - object.set_bucket("projects/_/buckets/test-bucket"); - object.set_name("test-object"); - object.set_size(kPersistedSize + 9); - *response.mutable_resource() = std::move(object); - return absl::make_optional(std::move(response)); - }); - }); - - EXPECT_CALL(*stream, Cancel).Times(1); - EXPECT_CALL(*stream, Finish).WillOnce([&] { - return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); - }); - - EXPECT_CALL(*stream, Write) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.state_lookup()); - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(StateLookup)"); - }) - .WillOnce( - [&](google::storage::v2::BidiWriteObjectRequest const& /*request*/, - grpc::WriteOptions wopt) { - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(data)"); - }) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.finish_write()); - EXPECT_TRUE(wopt.is_last_message()); - EXPECT_TRUE(request.has_object_checksums()); - EXPECT_EQ(request.object_checksums().crc32c(), 2901820631); - return sequencer.PushBack("Write(Finalize)"); - }); - - EXPECT_CALL(*mock, AsyncBidiWriteObject) - .WillOnce([&](auto const&, auto, auto) { - return std::unique_ptr(std::move(stream)); - }); - - internal::AutomaticallyCreatedBackgroundThreads pool(1); - auto options = TestOptions().set(true); - auto connection = MakeTestConnection(pool.cq(), mock, options); - - auto request = google::storage::v2::BidiWriteObjectRequest{}; - ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); - auto pending = connection->ResumeAppendableObjectUpload( - {std::move(request), connection->options()}); - - auto next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Start"); - next.first.set_value(true); - - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(StateLookup)"); - next.first.set_value(true); - - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read(PersistedSize)"); - next.first.set_value(true); - - auto r = pending.get(); - ASSERT_STATUS_OK(r); - auto writer = *std::move(r); - - auto w1 = writer->Write(storage::WritePayload("some data")); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(data)"); - next.first.set_value(true); - EXPECT_STATUS_OK(w1.get()); - - auto w2 = writer->Finalize({}); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(Finalize)"); - next.first.set_value(true); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read(FinalObject)"); - next.first.set_value(true); - - auto response = w2.get(); - ASSERT_STATUS_OK(response); - - writer.reset(); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Finish"); - next.first.set_value(true); -} - -#endif - } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index 92b3990b3c586..8cc40a4f2ff77 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -146,15 +146,10 @@ AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) { auto p = WritePayloadImpl::GetImpl(payload); auto size = p.size(); - auto action = PartialUpload::kFinalizeWithChecksum; - if (request_.has_append_object_spec() || - request_.write_object_spec().appendable()) { - if (!absl::holds_alternative( - persisted_state_) && - !persisted_data_checksums_.has_value()) { - action = PartialUpload::kFinalize; - } - } + auto action = request_.has_append_object_spec() || + request_.write_object_spec().appendable() + ? PartialUpload::kFinalize + : PartialUpload::kFinalizeWithChecksum; auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write), std::move(p), std::move(action)); return coro->Start().then([coro, size, this](auto f) mutable { diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 6d96e523a25d8..017b4aaf231c5 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -508,15 +508,8 @@ class AsyncWriterConnectionResumedState } // Recreate the underlying stream if still active. - auto hash = hash_function_; - if (checksums && checksums->has_crc32c()) { - hash = std::make_shared< - ::google::cloud::storage::internal::Crc32cHashFunction>( - checksums->crc32c(), persisted_offset); - } - impl_ = std::make_unique( - options_, initial_request_, std::move(res->stream), std::move(hash), + options_, initial_request_, std::move(res->stream), hash_function_, persisted_offset, false, checksums); // OnQuery will restart the WriteLoop if necessary. OnQuery(std::move(lk), persisted_offset); From 915cef588f417f3e1dfd709aef21c9a98bef26bf Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Mon, 29 Jun 2026 13:01:33 +0000 Subject: [PATCH 2/8] feat(storage): implement full object checksum validation for appendable uploads --- google/cloud/storage/async/writer.cc | 13 +- google/cloud/storage/async/writer.h | 12 +- .../cloud/storage/async/writer_connection.h | 6 + .../storage/internal/async/connection_impl.cc | 11 +- .../async/writer_connection_buffered.cc | 29 ++++- .../internal/async/writer_connection_impl.cc | 36 +++++- .../internal/async/writer_connection_impl.h | 8 +- .../async/writer_connection_impl_test.cc | 62 +++++++++ .../async/writer_connection_resumed.cc | 122 +++++++++++++++++- .../async/writer_connection_resumed_test.cc | 112 ++++++++++++++++ .../async/writer_connection_tracing.cc | 14 +- google/cloud/storage/internal/hash_function.h | 7 + .../storage/internal/hash_function_impl.h | 16 +++ .../storage/testing/mock_hash_function.h | 3 + 14 files changed, 429 insertions(+), 22 deletions(-) diff --git a/google/cloud/storage/async/writer.cc b/google/cloud/storage/async/writer.cc index 5195a55ef4f05..e030269392f95 100644 --- a/google/cloud/storage/async/writer.cc +++ b/google/cloud/storage/async/writer.cc @@ -70,21 +70,22 @@ future> AsyncWriter::Write(AsyncToken token, } future> AsyncWriter::Finalize( - AsyncToken token, WritePayload payload) { + AsyncToken token, WritePayload payload, + absl::optional const& expected_checksum) { if (!impl_) return StreamError(GCP_ERROR_INFO()); auto t = storage_internal::MakeAsyncToken(impl_.get()); if (token != t) { return TokenError(GCP_ERROR_INFO()); } - return impl_->Finalize(std::move(payload)).then([impl = impl_](auto f) { - return f.get(); - }); + return impl_->Finalize(std::move(payload), expected_checksum) + .then([impl = impl_](auto f) { return f.get(); }); } future> AsyncWriter::Finalize( - AsyncToken token) { - return Finalize(std::move(token), WritePayload{}); + AsyncToken token, + absl::optional const& expected_checksum) { + return Finalize(std::move(token), WritePayload{}, expected_checksum); } future AsyncWriter::Flush() { diff --git a/google/cloud/storage/async/writer.h b/google/cloud/storage/async/writer.h index f39f8f5bee3db..8bbd474e4a370 100644 --- a/google/cloud/storage/async/writer.h +++ b/google/cloud/storage/async/writer.h @@ -18,6 +18,7 @@ #include "google/cloud/storage/async/token.h" #include "google/cloud/storage/async/write_payload.h" #include "google/cloud/storage/async/writer_connection.h" +#include "google/cloud/storage/hashing_options.h" #include "google/cloud/future.h" #include "google/cloud/status_or.h" #include "google/cloud/version.h" @@ -115,13 +116,18 @@ class AsyncWriter { future> Write(AsyncToken token, WritePayload payload); /// Finalize the upload with the existing data. - future> Finalize(AsyncToken token); + future> Finalize( + AsyncToken token, + absl::optional const& expected_checksum = + absl::nullopt); /** * Upload @p payload and then finalize the upload. */ - future> Finalize(AsyncToken token, - WritePayload payload); + future> Finalize( + AsyncToken token, WritePayload payload, + absl::optional const& expected_checksum = + absl::nullopt); /** * Flush any buffered data to the service. diff --git a/google/cloud/storage/async/writer_connection.h b/google/cloud/storage/async/writer_connection.h index 3ab827623b99e..08ce48d07bd04 100644 --- a/google/cloud/storage/async/writer_connection.h +++ b/google/cloud/storage/async/writer_connection.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_WRITER_CONNECTION_H #include "google/cloud/storage/async/write_payload.h" +#include "google/cloud/storage/hashing_options.h" #include "google/cloud/storage/object_metadata.h" #include "google/cloud/future.h" #include "google/cloud/rpc_metadata.h" @@ -112,6 +113,11 @@ class AsyncWriterConnection { /// Finalizes an upload. virtual future> Finalize( WritePayload) = 0; + virtual future> Finalize( + WritePayload p, + absl::optional const& /*expected_checksum*/) { + return Finalize(std::move(p)); + } /// Uploads some data to the service and flushes the value. virtual future Flush(WritePayload payload) = 0; diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 303e420f91e86..aca0843d1bc9d 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -137,7 +137,16 @@ StatusOr> MakeAppendableWriter( current, request, std::move(rpc->stream), hash, resource, false); } else { persisted_size = rpc->first_response.persisted_size(); - hash = CreateHashFunction(*current); + if (current->get() && + rpc->first_response.has_persisted_data_checksums() && + rpc->first_response.persisted_data_checksums().has_crc32c()) { + hash = std::make_shared< + ::google::cloud::storage::internal::Crc32cHashFunction>( + rpc->first_response.persisted_data_checksums().crc32c(), + persisted_size); + } else { + hash = CreateHashFunction(*current); + } auto checksums = rpc->first_response.has_persisted_data_checksums() ? absl::make_optional( rpc->first_response.persisted_data_checksums()) diff --git a/google/cloud/storage/internal/async/writer_connection_buffered.cc b/google/cloud/storage/internal/async/writer_connection_buffered.cc index d00ead1f2cf40..18713a73062da 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered.cc @@ -106,6 +106,11 @@ class AsyncWriterConnectionBufferedState return Impl(std::unique_lock(mu_))->PersistedState(); } + absl::optional PersistedChecksums() + const { + return Impl(std::unique_lock(mu_))->PersistedChecksums(); + } + future Write(storage::WritePayload const& p) { std::unique_lock lk(mu_); resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); @@ -114,7 +119,15 @@ class AsyncWriterConnectionBufferedState future> Finalize( storage::WritePayload const& p) { + return Finalize(p, absl::nullopt); + } + future> Finalize( + storage::WritePayload const& p, + absl::optional const& expected_checksum) { std::unique_lock lk(mu_); + if (expected_checksum.has_value()) { + expected_checksum_ = expected_checksum; + } resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); finalize_ = true; HandleNewData(std::move(lk), true); @@ -246,7 +259,7 @@ class AsyncWriterConnectionBufferedState auto impl = Impl(lk); lk.unlock(); // Finalize with an empty payload. - (void)impl->Finalize(storage::WritePayload{}) + (void)impl->Finalize(storage::WritePayload{}, expected_checksum_) .then([w = WeakFromThis()](auto f) { if (auto self = w.lock()) return self->OnFinalize(f.get()); }); @@ -636,6 +649,7 @@ class AsyncWriterConnectionBufferedState // Retrieve the future in the constructor, as some operations reset // finalized_. future> finalized_future_; + absl::optional expected_checksum_; // The result of calling `Close()`. Note that only one such call is ever // made. @@ -763,13 +777,24 @@ class AsyncWriterConnectionBuffered : public storage::AsyncWriterConnection { return state_->PersistedState(); } + absl::optional PersistedChecksums() + const override { + return state_->PersistedChecksums(); + } + future Write(storage::WritePayload p) override { return state_->Write(std::move(p)); } future> Finalize( storage::WritePayload p) override { - return state_->Finalize(std::move(p)); + return Finalize(std::move(p), absl::nullopt); + } + future> Finalize( + storage::WritePayload p, + absl::optional const& expected_checksum) + override { + return state_->Finalize(std::move(p), expected_checksum); } future Flush(storage::WritePayload p) override { diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index 8cc40a4f2ff77..387580bf383be 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -140,22 +140,48 @@ future AsyncWriterConnectionImpl::Write(storage::WritePayload payload) { } future> -AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) { +AsyncWriterConnectionImpl::Finalize( + storage::WritePayload payload, + absl::optional const& expected_checksum) { auto write = MakeRequest(); write.set_finish_write(true); auto p = WritePayloadImpl::GetImpl(payload); auto size = p.size(); + + if (p.empty() && expected_checksum.has_value()) { + auto const actual = hash_function_->Finish().crc32c; + if (!actual.empty() && expected_checksum->value() != actual) { + return make_ready_future(StatusOr( + google::cloud::internal::DataLossError( + "client checksum mismatch: expected " + + expected_checksum->value() + " got " + actual, + GCP_ERROR_INFO()))); + } + } + auto action = request_.has_append_object_spec() || request_.write_object_spec().appendable() ? PartialUpload::kFinalize : PartialUpload::kFinalizeWithChecksum; auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write), std::move(p), std::move(action)); - return coro->Start().then([coro, size, this](auto f) mutable { - coro.reset(); // breaks the cycle between the completion queue and coro - return OnFinalUpload(size, f.get()); - }); + return coro->Start().then( + [coro, size, expected_checksum, this](auto f) mutable { + coro.reset(); // breaks the cycle between the completion queue and coro + auto res = f.get(); + if (res.ok() && *res && expected_checksum.has_value()) { + auto const actual = hash_function_->Finish().crc32c; + if (!actual.empty() && expected_checksum->value() != actual) { + return make_ready_future(StatusOr( + google::cloud::internal::DataLossError( + "client checksum mismatch: expected " + + expected_checksum->value() + " got " + actual, + GCP_ERROR_INFO()))); + } + } + return OnFinalUpload(size, std::move(res)); + }); } future AsyncWriterConnectionImpl::Flush(storage::WritePayload payload) { diff --git a/google/cloud/storage/internal/async/writer_connection_impl.h b/google/cloud/storage/internal/async/writer_connection_impl.h index cedfd1005bf48..3550a0cb1ebae 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.h +++ b/google/cloud/storage/internal/async/writer_connection_impl.h @@ -69,7 +69,13 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection { future Write(storage::WritePayload payload) override; future> Finalize( - storage::WritePayload) override; + storage::WritePayload payload) override { + return Finalize(std::move(payload), absl::nullopt); + } + future> Finalize( + storage::WritePayload payload, + absl::optional const& expected_checksum) + override; future Flush(storage::WritePayload payload) override; future Close(storage::WritePayload payload) override; future> Query() override; diff --git a/google/cloud/storage/internal/async/writer_connection_impl_test.cc b/google/cloud/storage/internal/async/writer_connection_impl_test.cc index 03ecf7699cc2c..240498c65f7a5 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl_test.cc @@ -904,6 +904,68 @@ TEST(AsyncWriterConnectionTest, CloseError) { EXPECT_THAT(response.get(), StatusIs(PermanentError().code())); } +TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchImmediate) { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, Finish).WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""})); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto response = tested->Finalize(WritePayload{}, storage::Crc32cChecksumValue("AAAAAA==")); + EXPECT_THAT(response.get(), StatusIs(StatusCode::kDataLoss)); +} + +TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMatchImmediate) { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + EXPECT_CALL(*mock, Write).WillOnce([](Request const&, grpc::WriteOptions) { + return make_ready_future(true); + }); + EXPECT_CALL(*mock, Read).WillOnce([] { + return make_ready_future(absl::make_optional(MakeTestResponse())); + }); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, Finish).WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto response = tested->Finalize(WritePayload{}, storage::Crc32cChecksumValue("ImIEBA==")); + EXPECT_THAT(response.get(), IsOk()); +} + +TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchOnComplete) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + EXPECT_CALL(*mock, Write) + .WillOnce([&](Request const& request, grpc::WriteOptions wopt) { + EXPECT_TRUE(request.finish_write()); + EXPECT_TRUE(wopt.is_last_message()); + return sequencer.PushBack("Write"); + }); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, Update(_, An(), _)).Times(1); + EXPECT_CALL(*hash, Finish).WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto response = tested->Finalize(WritePayload(std::string(128, 'A')), storage::Crc32cChecksumValue("AAAAAA==")); + auto next = sequencer.PopFrontWithName(); + ASSERT_THAT(next.second, "Write"); + next.first.set_value(true); + EXPECT_THAT(response.get(), StatusIs(StatusCode::kDataLoss)); +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 017b4aaf231c5..56d143707471c 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -16,6 +16,7 @@ #include "google/cloud/storage/internal/async/write_payload_impl.h" #include "google/cloud/storage/internal/async/writer_connection_impl.h" #include "google/cloud/storage/internal/hash_function_impl.h" +#include "google/cloud/storage/internal/crc32c.h" #include "google/cloud/future.h" #include "google/cloud/internal/make_status.h" #include "google/cloud/status.h" @@ -23,6 +24,7 @@ #include "absl/strings/cord.h" #include #include +#include #include #include #include @@ -85,6 +87,11 @@ class AsyncWriterConnectionResumedState } else { buffer_offset_ = absl::get(state); } + if (hash_function_) { + if (auto crc = hash_function_->CurrentCrc32c()) { + crc32c_history_[buffer_offset_] = *crc; + } + } if (first_response_.has_write_handle()) { latest_write_handle_ = first_response_.write_handle(); } else if (initial_request_.has_append_object_spec() && @@ -116,6 +123,11 @@ class AsyncWriterConnectionResumedState return Impl(std::unique_lock(mu_))->PersistedState(); } + absl::optional PersistedChecksums() + const { + return Impl(std::unique_lock(mu_))->PersistedChecksums(); + } + future Write(storage::WritePayload const& p) { std::unique_lock lk(mu_); resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); @@ -124,7 +136,15 @@ class AsyncWriterConnectionResumedState future> Finalize( storage::WritePayload const& p) { + return Finalize(p, absl::nullopt); + } + future> Finalize( + storage::WritePayload const& p, + absl::optional const& expected_checksum) { std::unique_lock lk(mu_); + if (expected_checksum.has_value()) { + expected_checksum_ = expected_checksum; + } resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); finalize_ = true; HandleNewData(std::move(lk)); @@ -255,13 +275,24 @@ class AsyncWriterConnectionResumedState // If another thread initiated FinalizeStep concurrently, just return. return; } + if (expected_checksum_.has_value() && hash_function_) { + auto const actual = hash_function_->Finish().crc32c; + if (!actual.empty() && expected_checksum_->value() != actual) { + SetError(std::move(lk), + google::cloud::internal::DataLossError( + "client checksum mismatch: expected " + + expected_checksum_->value() + " got " + actual, + GCP_ERROR_INFO())); + return; + } + } // Mark that we are starting the finalization process. state_ = State::kWriting; finalizing_ = true; auto impl = Impl(lk); lk.unlock(); // Finalize with an empty payload. - (void)impl->Finalize(storage::WritePayload{}) + (void)impl->Finalize(storage::WritePayload{}, expected_checksum_) .then([w = WeakFromThis()](auto f) { if (auto self = w.lock()) return self->OnFinalize(f.get()); }); @@ -288,6 +319,18 @@ class AsyncWriterConnectionResumedState void OnClose(Status result) { if (!result.ok()) return Resume(std::move(result)); + auto checksums = impl_->PersistedChecksums(); + if (checksums && checksums->has_crc32c()) { + std::unique_lock lk(mu_); + auto it = crc32c_history_.find(buffer_offset_); + if (it != crc32c_history_.end() && it->second != checksums->crc32c()) { + SetError(std::move(lk), + google::cloud::internal::DataLossError( + "client/server checksum mismatch at Close", + GCP_ERROR_INFO())); + return; + } + } SetClosed(std::unique_lock(mu_), std::move(result)); } @@ -308,12 +351,38 @@ class AsyncWriterConnectionResumedState if (!result.ok()) return Resume(std::move(result)); std::unique_lock lk(mu_); write_offset_ += write_size; + if (hash_function_) { + if (auto crc = hash_function_->CurrentCrc32c()) { + crc32c_history_[buffer_offset_ + write_offset_] = *crc; + } + } auto impl = Impl(lk); lk.unlock(); impl->Query().then([result, w = WeakFromThis()](auto f) { auto self = w.lock(); if (!self) return; - self->OnQuery(f.get()); + auto query_res = f.get(); + if (!query_res.ok()) { + self->OnQuery(std::move(query_res)); + self->SetFlushed(std::unique_lock(self->mu_), + std::move(result)); + return; + } + auto const persisted_size = *query_res; + auto checksums = self->impl_->PersistedChecksums(); + if (checksums && checksums->has_crc32c()) { + std::unique_lock lk(self->mu_); + auto it = self->crc32c_history_.find(persisted_size); + if (it != self->crc32c_history_.end() && + it->second != checksums->crc32c()) { + self->SetError(std::move(lk), + google::cloud::internal::DataLossError( + "client/server checksum mismatch at Flush", + GCP_ERROR_INFO())); + return; + } + } + self->OnQuery(std::move(query_res)); self->SetFlushed(std::unique_lock(self->mu_), std::move(result)); }); @@ -354,6 +423,37 @@ class AsyncWriterConnectionResumedState MakeFastForwardError(buffer_offset_, persisted_size, GCP_ERROR_INFO())); } + if (hash_function_ && hash_function_->CurrentCrc32c().has_value()) { + auto it = crc32c_history_.find(persisted_size); + if (it != crc32c_history_.end()) { + hash_function_->RestoreCrc32c(it->second, persisted_size); + } else if (!crc32c_history_.empty()) { + auto upper = crc32c_history_.upper_bound(persisted_size); + if (upper != crc32c_history_.begin()) { + --upper; + auto const y = upper->first; + auto const crc_y = upper->second; + hash_function_->RestoreCrc32c(crc_y, y); + if (y < persisted_size) { + auto const slice_offset = + static_cast(y - buffer_offset_); + auto const slice_len = + static_cast(persisted_size - y); + if (slice_offset + slice_len <= resend_buffer_.size()) { + auto slice = resend_buffer_.Subcord(slice_offset, slice_len); + (void)hash_function_->Update(y, slice, Crc32c(slice)); + } + } + if (auto current = hash_function_->CurrentCrc32c()) { + crc32c_history_[persisted_size] = *current; + } + } + } + auto purge_it = crc32c_history_.upper_bound(persisted_size); + crc32c_history_.erase(purge_it, crc32c_history_.end()); + crc32c_history_.erase(crc32c_history_.begin(), + crc32c_history_.lower_bound(persisted_size)); + } resend_buffer_.RemovePrefix(static_cast(n)); buffer_offset_ = persisted_size; if (state_ == State::kResuming) { @@ -397,6 +497,9 @@ class AsyncWriterConnectionResumedState if (!result.ok()) return Resume(std::move(result)); std::unique_lock lk(mu_); write_offset_ += write_size; + if (auto crc = hash_function_->CurrentCrc32c()) { + crc32c_history_[buffer_offset_ + write_offset_] = *crc; + } state_ = State::kIdle; return StartWriting(std::move(lk)); } @@ -692,6 +795,8 @@ class AsyncWriterConnectionResumedState // Retrieve the future in the constructor, as some operations reset // finalized_. future> finalized_future_; + absl::optional expected_checksum_; + std::map crc32c_history_; // The result of calling `Close()`. Note that only one such call is ever // made. @@ -829,13 +934,24 @@ class AsyncWriterConnectionResumed : public storage::AsyncWriterConnection { return state_->PersistedState(); } + absl::optional PersistedChecksums() + const override { + return state_->PersistedChecksums(); + } + future Write(storage::WritePayload p) override { return state_->Write(std::move(p)); } future> Finalize( storage::WritePayload p) override { - return state_->Finalize(std::move(p)); + return state_->Finalize(std::move(p), absl::nullopt); + } + future> Finalize( + storage::WritePayload p, + absl::optional const& expected_checksum) + override { + return state_->Finalize(std::move(p), expected_checksum); } future Flush(storage::WritePayload p) override { diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index e510ddad65e3c..541d8c61475ac 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -32,6 +32,7 @@ namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +using ::google::cloud::storage::testing::MockHashFunction; using ::google::cloud::storage::testing::canonical_errors::PermanentError; using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::storage_mocks::MockAsyncWriterConnection; @@ -40,6 +41,8 @@ using ::google::cloud::testing_util::IsOkAndHolds; using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::StatusIs; using ::testing::_; +using ::testing::An; +using ::testing::AnyNumber; using ::testing::Eq; using ::testing::ResultOf; using ::testing::Return; @@ -1198,6 +1201,115 @@ TEST(WriterConnectionResumed, CloseFailsAndResumeSucceedsAndFinalized) { EXPECT_STATUS_OK(close.get()); } +TEST(WriterConnectionResumed, FinalizeExpectedChecksumMismatch) { + auto mock = std::make_unique(); + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + EXPECT_CALL(*mock, PersistedState).WillRepeatedly(Return(MakePersistedState(0))); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(100)); + EXPECT_CALL(*hash, Finish).WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""})); + MockFactory mock_factory; + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, hash, + first_response, Options{}); + auto finalize = connection->Finalize({}, storage::Crc32cChecksumValue("AAAAAA==")); + EXPECT_THAT(finalize.get(), StatusIs(StatusCode::kDataLoss)); +} + +TEST(WriterConnectionResumed, RollbackChecksumOnResume) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + EXPECT_CALL(*mock, PersistedState).WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const&) { + return sequencer.PushBack("Flush").then([](auto f) { + if (!f.get()) return TransientError(); + return Status{}; + }); + }); + + auto hash = std::make_shared(); + EXPECT_CALL(*hash, CurrentCrc32c) + .WillOnce(Return(100)) + .WillOnce(Return(200)) + .WillRepeatedly(Return(200)); + EXPECT_CALL(*hash, Update(_, An(), _)).Times(AnyNumber()); + EXPECT_CALL(*hash, RestoreCrc32c(_, _)).Times(AnyNumber()); + + MockFactory mock_factory; + auto mock_stream = + std::make_unique>(); + auto* mock_stream_ptr = mock_stream.get(); + + google::storage::v2::BidiWriteObjectResponse query_resp1; + query_resp1.set_persisted_size(10); + google::storage::v2::BidiWriteObjectResponse query_resp2; + query_resp2.set_persisted_size(20); + EXPECT_CALL(mock_factory, Call(_)).WillOnce([&](auto const&) { + WriteObject::WriteResult result; + result.stream = std::move(mock_stream); + return sequencer.PushBack("Factory").then([r = std::move(result)](auto) mutable { + return StatusOr(std::move(r)); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Write).WillRepeatedly([&](auto const&, auto) { + return sequencer.PushBack("StreamWrite").then([](auto) { return true; }); + }); + EXPECT_CALL(*mock_stream_ptr, Read) + .WillOnce([&, query_resp1]() { + return sequencer.PushBack("StreamRead").then([query_resp1](auto) { + return absl::make_optional(query_resp1); + }); + }) + .WillRepeatedly([&, query_resp2]() { + return sequencer.PushBack("StreamRead").then([query_resp2](auto) { + return absl::make_optional(query_resp2); + }); + }); + EXPECT_CALL(*mock_stream_ptr, Cancel).Times(AnyNumber()); + EXPECT_CALL(*mock_stream_ptr, Finish).WillRepeatedly([] { + return make_ready_future(Status{}); + }); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, hash, + first_response, Options{}); + auto write = connection->Write(TestPayload(20)); + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(false); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Factory"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead"); + next.first.set_value(true); + + EXPECT_STATUS_OK(write.get()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/writer_connection_tracing.cc b/google/cloud/storage/internal/async/writer_connection_tracing.cc index 96ad896a0369f..897e4c895944c 100644 --- a/google/cloud/storage/internal/async/writer_connection_tracing.cc +++ b/google/cloud/storage/internal/async/writer_connection_tracing.cc @@ -60,6 +60,11 @@ class AsyncWriterConnectionTracing : public storage::AsyncWriterConnection { return impl_->PersistedState(); } + absl::optional PersistedChecksums() + const override { + return impl_->PersistedChecksums(); + } + future Write(storage::WritePayload p) override { internal::OTelScope scope(span_); auto size = static_cast(p.size()); @@ -81,9 +86,16 @@ class AsyncWriterConnectionTracing : public storage::AsyncWriterConnection { future> Finalize( storage::WritePayload p) override { + return Finalize(std::move(p), absl::nullopt); + } + + future> Finalize( + storage::WritePayload p, + absl::optional const& expected_checksum) + override { internal::OTelScope scope(span_); auto size = static_cast(p.size()); - return impl_->Finalize(std::move(p)) + return impl_->Finalize(std::move(p), expected_checksum) .then([count = ++sent_count_, span = span_, size](auto f) { span->AddEvent( "gl-cpp.finalize", diff --git a/google/cloud/storage/internal/hash_function.h b/google/cloud/storage/internal/hash_function.h index da7699353739f..90038fad26578 100644 --- a/google/cloud/storage/internal/hash_function.h +++ b/google/cloud/storage/internal/hash_function.h @@ -21,6 +21,7 @@ #include "google/cloud/status.h" #include "absl/strings/cord.h" #include "absl/strings/string_view.h" +#include "absl/types/optional.h" #include #include @@ -79,6 +80,12 @@ class HashFunction { virtual Status Update(std::int64_t offset, absl::Cord const& buffer, std::uint32_t buffer_crc) = 0; + virtual absl::optional CurrentCrc32c() const { + return absl::nullopt; + } + virtual void RestoreCrc32c(std::uint32_t /*crc32c*/, + std::int64_t /*offset*/) {} + /** * Compute the final hash values. */ diff --git a/google/cloud/storage/internal/hash_function_impl.h b/google/cloud/storage/internal/hash_function_impl.h index 6b028d3cf0bab..ff3e0a0d6be0c 100644 --- a/google/cloud/storage/internal/hash_function_impl.h +++ b/google/cloud/storage/internal/hash_function_impl.h @@ -62,6 +62,15 @@ class CompositeFunction : public HashFunction { std::uint32_t buffer_crc) override; Status Update(std::int64_t offset, absl::Cord const& buffer, std::uint32_t buffer_crc) override; + absl::optional CurrentCrc32c() const override { + auto c = a_->CurrentCrc32c(); + if (c.has_value()) return c; + return b_->CurrentCrc32c(); + } + void RestoreCrc32c(std::uint32_t crc32c, std::int64_t offset) override { + a_->RestoreCrc32c(crc32c, offset); + b_->RestoreCrc32c(crc32c, offset); + } HashValues Finish() override; private: @@ -120,6 +129,13 @@ class Crc32cHashFunction : public HashFunction { std::uint32_t buffer_crc) override; Status Update(std::int64_t offset, absl::Cord const& buffer, std::uint32_t buffer_crc) override; + absl::optional CurrentCrc32c() const override { + return current_; + } + void RestoreCrc32c(std::uint32_t crc32c, std::int64_t offset) override { + current_ = crc32c; + minimum_offset_ = offset; + } HashValues Finish() override; private: diff --git a/google/cloud/storage/testing/mock_hash_function.h b/google/cloud/storage/testing/mock_hash_function.h index e749019f8093a..90a4d741d8515 100644 --- a/google/cloud/storage/testing/mock_hash_function.h +++ b/google/cloud/storage/testing/mock_hash_function.h @@ -33,6 +33,9 @@ class MockHashFunction : public storage::internal::HashFunction { (override)); MOCK_METHOD(Status, Update, (std::int64_t, absl::Cord const&, std::uint32_t), (override)); + MOCK_METHOD(absl::optional, CurrentCrc32c, (), + (const, override)); + MOCK_METHOD(void, RestoreCrc32c, (std::uint32_t, std::int64_t), (override)); MOCK_METHOD(storage::internal::HashValues, Finish, (), (override)); }; From 9070d9c665506f91fe207fbc3de3704c53b790ca Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 30 Jun 2026 06:52:52 +0000 Subject: [PATCH 3/8] style(storage): format code with clang-format --- .../async/writer_connection_impl_test.cc | 18 ++++++++++++------ .../async/writer_connection_resumed.cc | 12 +++++------- .../async/writer_connection_resumed_test.cc | 19 ++++++++++++------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_impl_test.cc b/google/cloud/storage/internal/async/writer_connection_impl_test.cc index 240498c65f7a5..2fda3bb782570 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl_test.cc @@ -911,11 +911,13 @@ TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchImmediate) { return make_ready_future(Status{}); }); auto hash = std::make_shared(); - EXPECT_CALL(*hash, Finish).WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""})); + EXPECT_CALL(*hash, Finish) + .WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""})); auto tested = std::make_unique( TestOptions(), MakeRequest(), std::move(mock), hash, 1024); - auto response = tested->Finalize(WritePayload{}, storage::Crc32cChecksumValue("AAAAAA==")); + auto response = tested->Finalize(WritePayload{}, + storage::Crc32cChecksumValue("AAAAAA==")); EXPECT_THAT(response.get(), StatusIs(StatusCode::kDataLoss)); } @@ -932,11 +934,13 @@ TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMatchImmediate) { return make_ready_future(absl::make_optional(MakeTestResponse())); }); auto hash = std::make_shared(); - EXPECT_CALL(*hash, Finish).WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); + EXPECT_CALL(*hash, Finish) + .WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); auto tested = std::make_unique( TestOptions(), MakeRequest(), std::move(mock), hash, 1024); - auto response = tested->Finalize(WritePayload{}, storage::Crc32cChecksumValue("ImIEBA==")); + auto response = tested->Finalize(WritePayload{}, + storage::Crc32cChecksumValue("ImIEBA==")); EXPECT_THAT(response.get(), IsOk()); } @@ -955,11 +959,13 @@ TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchOnComplete) { }); auto hash = std::make_shared(); EXPECT_CALL(*hash, Update(_, An(), _)).Times(1); - EXPECT_CALL(*hash, Finish).WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); + EXPECT_CALL(*hash, Finish) + .WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); auto tested = std::make_unique( TestOptions(), MakeRequest(), std::move(mock), hash, 1024); - auto response = tested->Finalize(WritePayload(std::string(128, 'A')), storage::Crc32cChecksumValue("AAAAAA==")); + auto response = tested->Finalize(WritePayload(std::string(128, 'A')), + storage::Crc32cChecksumValue("AAAAAA==")); auto next = sequencer.PopFrontWithName(); ASSERT_THAT(next.second, "Write"); next.first.set_value(true); diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 56d143707471c..31d93ed2816d8 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -15,8 +15,8 @@ #include "google/cloud/storage/internal/async/writer_connection_resumed.h" #include "google/cloud/storage/internal/async/write_payload_impl.h" #include "google/cloud/storage/internal/async/writer_connection_impl.h" -#include "google/cloud/storage/internal/hash_function_impl.h" #include "google/cloud/storage/internal/crc32c.h" +#include "google/cloud/storage/internal/hash_function_impl.h" #include "google/cloud/future.h" #include "google/cloud/internal/make_status.h" #include "google/cloud/status.h" @@ -324,10 +324,9 @@ class AsyncWriterConnectionResumedState std::unique_lock lk(mu_); auto it = crc32c_history_.find(buffer_offset_); if (it != crc32c_history_.end() && it->second != checksums->crc32c()) { - SetError(std::move(lk), - google::cloud::internal::DataLossError( - "client/server checksum mismatch at Close", - GCP_ERROR_INFO())); + SetError(std::move(lk), google::cloud::internal::DataLossError( + "client/server checksum mismatch at Close", + GCP_ERROR_INFO())); return; } } @@ -437,8 +436,7 @@ class AsyncWriterConnectionResumedState if (y < persisted_size) { auto const slice_offset = static_cast(y - buffer_offset_); - auto const slice_len = - static_cast(persisted_size - y); + auto const slice_len = static_cast(persisted_size - y); if (slice_offset + slice_len <= resend_buffer_.size()) { auto slice = resend_buffer_.Subcord(slice_offset, slice_len); (void)hash_function_->Update(y, slice, Crc32c(slice)); diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 541d8c61475ac..fe00d804003b8 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -1206,16 +1206,19 @@ TEST(WriterConnectionResumed, FinalizeExpectedChecksumMismatch) { auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; auto first_response = google::storage::v2::BidiWriteObjectResponse{}; - EXPECT_CALL(*mock, PersistedState).WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); auto hash = std::make_shared(); EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(100)); - EXPECT_CALL(*hash, Finish).WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""})); + EXPECT_CALL(*hash, Finish) + .WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""})); MockFactory mock_factory; auto connection = MakeWriterConnectionResumed( mock_factory.AsStdFunction(), std::move(mock), initial_request, hash, first_response, Options{}); - auto finalize = connection->Finalize({}, storage::Crc32cChecksumValue("AAAAAA==")); + auto finalize = + connection->Finalize({}, storage::Crc32cChecksumValue("AAAAAA==")); EXPECT_THAT(finalize.get(), StatusIs(StatusCode::kDataLoss)); } @@ -1225,7 +1228,8 @@ TEST(WriterConnectionResumed, RollbackChecksumOnResume) { auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; auto first_response = google::storage::v2::BidiWriteObjectResponse{}; - EXPECT_CALL(*mock, PersistedState).WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const&) { return sequencer.PushBack("Flush").then([](auto f) { if (!f.get()) return TransientError(); @@ -1255,9 +1259,10 @@ TEST(WriterConnectionResumed, RollbackChecksumOnResume) { EXPECT_CALL(mock_factory, Call(_)).WillOnce([&](auto const&) { WriteObject::WriteResult result; result.stream = std::move(mock_stream); - return sequencer.PushBack("Factory").then([r = std::move(result)](auto) mutable { - return StatusOr(std::move(r)); - }); + return sequencer.PushBack("Factory").then( + [r = std::move(result)](auto) mutable { + return StatusOr(std::move(r)); + }); }); EXPECT_CALL(*mock_stream_ptr, Write).WillRepeatedly([&](auto const&, auto) { From 0a264392fce0ecb75a1a3847f070fee7ef205adc Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 30 Jun 2026 07:08:54 +0000 Subject: [PATCH 4/8] fix(storage): fix data races when accessing impl_ in AsyncWriterConnectionResumed --- .../internal/async/writer_connection_resumed.cc | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 31d93ed2816d8..55a6eb212af65 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -319,9 +319,9 @@ class AsyncWriterConnectionResumedState void OnClose(Status result) { if (!result.ok()) return Resume(std::move(result)); - auto checksums = impl_->PersistedChecksums(); + std::unique_lock lk(mu_); + auto checksums = Impl(lk)->PersistedChecksums(); if (checksums && checksums->has_crc32c()) { - std::unique_lock lk(mu_); auto it = crc32c_history_.find(buffer_offset_); if (it != crc32c_history_.end() && it->second != checksums->crc32c()) { SetError(std::move(lk), google::cloud::internal::DataLossError( @@ -330,7 +330,7 @@ class AsyncWriterConnectionResumedState return; } } - SetClosed(std::unique_lock(mu_), std::move(result)); + SetClosed(std::move(lk), std::move(result)); } void FlushStep(std::unique_lock lk, absl::Cord payload) { @@ -368,9 +368,9 @@ class AsyncWriterConnectionResumedState return; } auto const persisted_size = *query_res; - auto checksums = self->impl_->PersistedChecksums(); + std::unique_lock lk(self->mu_); + auto checksums = self->Impl(lk)->PersistedChecksums(); if (checksums && checksums->has_crc32c()) { - std::unique_lock lk(self->mu_); auto it = self->crc32c_history_.find(persisted_size); if (it != self->crc32c_history_.end() && it->second != checksums->crc32c()) { @@ -381,6 +381,7 @@ class AsyncWriterConnectionResumedState return; } } + lk.unlock(); self->OnQuery(std::move(query_res)); self->SetFlushed(std::unique_lock(self->mu_), std::move(result)); @@ -406,7 +407,7 @@ class AsyncWriterConnectionResumedState } void OnQuery(std::unique_lock lk, std::int64_t persisted_size) { - auto handle = impl_->WriteHandle(); + auto handle = Impl(lk)->WriteHandle(); if (handle) { latest_write_handle_ = *std::move(handle); } @@ -580,14 +581,14 @@ class AsyncWriterConnectionResumedState checksums = first_res.persisted_data_checksums(); } } else { - auto state = impl_->PersistedState(); + auto state = Impl(lk)->PersistedState(); if (absl::holds_alternative(state)) { finalized = true; finalized_object = absl::get(std::move(state)); } else { persisted_offset = absl::get(state); - checksums = impl_->PersistedChecksums(); + checksums = Impl(lk)->PersistedChecksums(); } } From 33af3f098d75937fe967e362e12afe3eca4548cb Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 30 Jun 2026 08:32:35 +0000 Subject: [PATCH 5/8] fix(storage): resolve hash function lifecycle and rollback corruption in appendable uploads --- .../storage/internal/async/connection_impl.cc | 42 ++++++++--- .../internal/async/writer_connection_impl.cc | 4 +- .../async/writer_connection_impl_test.cc | 5 +- .../async/writer_connection_resumed.cc | 70 +++++++++++-------- 4 files changed, 77 insertions(+), 44 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index aca0843d1bc9d..7a0d45e6589fc 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -110,6 +110,35 @@ std::unique_ptr CreateHashFunction( return storage::internal::CreateNullHashFunction(); } +std::shared_ptr CreateAppendableHashFunction( + Options const& options) { + auto crc32c = std::unique_ptr(); + if (options.has()) { + crc32c = std::make_unique( + storage::internal::HashValues{ + Crc32cFromProto(options.get()), + /*.md5=*/{}}); + } else if (options.get()) { + crc32c = std::make_unique(); + } + + auto md5 = std::unique_ptr(); + if (options.has()) { + md5 = std::make_unique( + storage::internal::HashValues{ + /*.crc32=*/{}, + MD5FromProto(options.get())}); + } + + if (crc32c && md5) { + return std::make_shared( + std::move(crc32c), std::move(md5)); + } + if (crc32c) return std::move(crc32c); + if (md5) return std::move(md5); + return storage::internal::CreateNullHashFunction(); +} + StatusOr> MakeAppendableWriter( google::cloud::internal::ImmutableOptions const& current, google::storage::v2::BidiWriteObjectRequest request, @@ -125,27 +154,22 @@ StatusOr> MakeAppendableWriter( if (rpc->first_response.has_resource()) { auto const& resource = rpc->first_response.resource(); + hash = CreateAppendableHashFunction(*current); if (current->get() && resource.has_checksums() && resource.checksums().has_crc32c()) { - hash = std::make_shared< - ::google::cloud::storage::internal::Crc32cHashFunction>( - resource.checksums().crc32c(), resource.size()); - } else { - hash = CreateHashFunction(*current); + hash->RestoreCrc32c(resource.checksums().crc32c(), resource.size()); } impl = std::make_unique( current, request, std::move(rpc->stream), hash, resource, false); } else { persisted_size = rpc->first_response.persisted_size(); + hash = CreateAppendableHashFunction(*current); if (current->get() && rpc->first_response.has_persisted_data_checksums() && rpc->first_response.persisted_data_checksums().has_crc32c()) { - hash = std::make_shared< - ::google::cloud::storage::internal::Crc32cHashFunction>( + hash->RestoreCrc32c( rpc->first_response.persisted_data_checksums().crc32c(), persisted_size); - } else { - hash = CreateHashFunction(*current); } auto checksums = rpc->first_response.has_persisted_data_checksums() ? absl::make_optional( diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index 387580bf383be..93591b4965463 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -150,7 +150,7 @@ AsyncWriterConnectionImpl::Finalize( auto size = p.size(); if (p.empty() && expected_checksum.has_value()) { - auto const actual = hash_function_->Finish().crc32c; + auto const actual = FormatCrc32c(hash_function_->CurrentCrc32c()); if (!actual.empty() && expected_checksum->value() != actual) { return make_ready_future(StatusOr( google::cloud::internal::DataLossError( @@ -171,7 +171,7 @@ AsyncWriterConnectionImpl::Finalize( coro.reset(); // breaks the cycle between the completion queue and coro auto res = f.get(); if (res.ok() && *res && expected_checksum.has_value()) { - auto const actual = hash_function_->Finish().crc32c; + auto const actual = FormatCrc32c(hash_function_->CurrentCrc32c()); if (!actual.empty() && expected_checksum->value() != actual) { return make_ready_future(StatusOr( google::cloud::internal::DataLossError( diff --git a/google/cloud/storage/internal/async/writer_connection_impl_test.cc b/google/cloud/storage/internal/async/writer_connection_impl_test.cc index 2fda3bb782570..52277bbdd4729 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl_test.cc @@ -911,8 +911,7 @@ TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchImmediate) { return make_ready_future(Status{}); }); auto hash = std::make_shared(); - EXPECT_CALL(*hash, Finish) - .WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""})); + EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(0x22620404)); auto tested = std::make_unique( TestOptions(), MakeRequest(), std::move(mock), hash, 1024); @@ -934,6 +933,7 @@ TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMatchImmediate) { return make_ready_future(absl::make_optional(MakeTestResponse())); }); auto hash = std::make_shared(); + EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(0x22620404)); EXPECT_CALL(*hash, Finish) .WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); @@ -959,6 +959,7 @@ TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchOnComplete) { }); auto hash = std::make_shared(); EXPECT_CALL(*hash, Update(_, An(), _)).Times(1); + EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(0x22620404)); EXPECT_CALL(*hash, Finish) .WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 55a6eb212af65..cb10557a4737f 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -322,7 +322,12 @@ class AsyncWriterConnectionResumedState std::unique_lock lk(mu_); auto checksums = Impl(lk)->PersistedChecksums(); if (checksums && checksums->has_crc32c()) { - auto it = crc32c_history_.find(buffer_offset_); + auto const state = Impl(lk)->PersistedState(); + auto const persisted_size = + absl::holds_alternative(state) + ? absl::get(state).size() + : absl::get(state); + auto it = crc32c_history_.find(persisted_size); if (it != crc32c_history_.end() && it->second != checksums->crc32c()) { SetError(std::move(lk), google::cloud::internal::DataLossError( "client/server checksum mismatch at Close", @@ -406,6 +411,38 @@ class AsyncWriterConnectionResumedState return tmp; } + void RestoreChecksumState(std::int64_t persisted_size) { + if (!hash_function_ || !hash_function_->CurrentCrc32c().has_value()) return; + auto it = crc32c_history_.find(persisted_size); + if (it != crc32c_history_.end()) { + hash_function_->RestoreCrc32c(it->second, persisted_size); + } else if (!crc32c_history_.empty()) { + auto upper = crc32c_history_.upper_bound(persisted_size); + if (upper != crc32c_history_.begin()) { + --upper; + auto const y = upper->first; + auto const crc_y = upper->second; + hash_function_->RestoreCrc32c(crc_y, y); + if (y < persisted_size) { + auto const slice_offset = + static_cast(y - buffer_offset_); + auto const slice_len = static_cast(persisted_size - y); + if (slice_offset + slice_len <= resend_buffer_.size()) { + auto slice = resend_buffer_.Subcord(slice_offset, slice_len); + (void)hash_function_->Update(y, slice, Crc32c(slice)); + } + } + if (auto current = hash_function_->CurrentCrc32c()) { + crc32c_history_[persisted_size] = *current; + } + } + } + auto purge_it = crc32c_history_.upper_bound(persisted_size); + crc32c_history_.erase(purge_it, crc32c_history_.end()); + crc32c_history_.erase(crc32c_history_.begin(), + crc32c_history_.lower_bound(persisted_size)); + } + void OnQuery(std::unique_lock lk, std::int64_t persisted_size) { auto handle = Impl(lk)->WriteHandle(); if (handle) { @@ -423,36 +460,7 @@ class AsyncWriterConnectionResumedState MakeFastForwardError(buffer_offset_, persisted_size, GCP_ERROR_INFO())); } - if (hash_function_ && hash_function_->CurrentCrc32c().has_value()) { - auto it = crc32c_history_.find(persisted_size); - if (it != crc32c_history_.end()) { - hash_function_->RestoreCrc32c(it->second, persisted_size); - } else if (!crc32c_history_.empty()) { - auto upper = crc32c_history_.upper_bound(persisted_size); - if (upper != crc32c_history_.begin()) { - --upper; - auto const y = upper->first; - auto const crc_y = upper->second; - hash_function_->RestoreCrc32c(crc_y, y); - if (y < persisted_size) { - auto const slice_offset = - static_cast(y - buffer_offset_); - auto const slice_len = static_cast(persisted_size - y); - if (slice_offset + slice_len <= resend_buffer_.size()) { - auto slice = resend_buffer_.Subcord(slice_offset, slice_len); - (void)hash_function_->Update(y, slice, Crc32c(slice)); - } - } - if (auto current = hash_function_->CurrentCrc32c()) { - crc32c_history_[persisted_size] = *current; - } - } - } - auto purge_it = crc32c_history_.upper_bound(persisted_size); - crc32c_history_.erase(purge_it, crc32c_history_.end()); - crc32c_history_.erase(crc32c_history_.begin(), - crc32c_history_.lower_bound(persisted_size)); - } + RestoreChecksumState(persisted_size); resend_buffer_.RemovePrefix(static_cast(n)); buffer_offset_ = persisted_size; if (state_ == State::kResuming) { From edad85213acba3cea97412ed4c03081bded4b32f Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 30 Jun 2026 08:45:35 +0000 Subject: [PATCH 6/8] fix(storage): preserve ABI compatibility by adding explicit Finalize overloads without default arguments --- google/cloud/storage/async/writer.cc | 10 ++++++++++ google/cloud/storage/async/writer.h | 9 +++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/async/writer.cc b/google/cloud/storage/async/writer.cc index e030269392f95..5f8a9b648db65 100644 --- a/google/cloud/storage/async/writer.cc +++ b/google/cloud/storage/async/writer.cc @@ -69,6 +69,11 @@ future> AsyncWriter::Write(AsyncToken token, }); } +future> AsyncWriter::Finalize( + AsyncToken token, WritePayload payload) { + return Finalize(std::move(token), std::move(payload), absl::nullopt); +} + future> AsyncWriter::Finalize( AsyncToken token, WritePayload payload, absl::optional const& expected_checksum) { @@ -82,6 +87,11 @@ future> AsyncWriter::Finalize( .then([impl = impl_](auto f) { return f.get(); }); } +future> AsyncWriter::Finalize( + AsyncToken token) { + return Finalize(std::move(token), WritePayload{}, absl::nullopt); +} + future> AsyncWriter::Finalize( AsyncToken token, absl::optional const& expected_checksum) { diff --git a/google/cloud/storage/async/writer.h b/google/cloud/storage/async/writer.h index 8bbd474e4a370..52bfa8132b887 100644 --- a/google/cloud/storage/async/writer.h +++ b/google/cloud/storage/async/writer.h @@ -116,18 +116,19 @@ class AsyncWriter { future> Write(AsyncToken token, WritePayload payload); /// Finalize the upload with the existing data. + future> Finalize(AsyncToken token); future> Finalize( AsyncToken token, - absl::optional const& expected_checksum = - absl::nullopt); + absl::optional const& expected_checksum); /** * Upload @p payload and then finalize the upload. */ + future> Finalize(AsyncToken token, + WritePayload payload); future> Finalize( AsyncToken token, WritePayload payload, - absl::optional const& expected_checksum = - absl::nullopt); + absl::optional const& expected_checksum); /** * Flush any buffered data to the service. From 1399a0337f0df10b07349156a23d1136cd8d67c3 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 30 Jun 2026 09:05:48 +0000 Subject: [PATCH 7/8] fix(storage): resolve missing FormatCrc32c declaration and redundant move warnings --- google/cloud/storage/internal/async/connection_impl.cc | 4 ++-- .../storage/internal/async/writer_connection_impl.cc | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 7a0d45e6589fc..faf72a3cc0f85 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -134,8 +134,8 @@ std::shared_ptr CreateAppendableHashFunction( return std::make_shared( std::move(crc32c), std::move(md5)); } - if (crc32c) return std::move(crc32c); - if (md5) return std::move(md5); + if (crc32c) return crc32c; + if (md5) return md5; return storage::internal::CreateNullHashFunction(); } diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index 93591b4965463..d3b0a23f939dd 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -16,9 +16,11 @@ #include "google/cloud/storage/internal/async/handle_redirect_error.h" #include "google/cloud/storage/internal/async/partial_upload.h" #include "google/cloud/storage/internal/async/write_payload_impl.h" +#include "google/cloud/storage/internal/base64.h" #include "google/cloud/storage/internal/grpc/ctype_cord_workaround.h" #include "google/cloud/storage/internal/grpc/object_metadata_parser.h" #include "google/cloud/storage/internal/hash_function_impl.h" +#include "google/cloud/internal/big_endian.h" #include "google/cloud/internal/make_status.h" namespace google { @@ -27,6 +29,12 @@ namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +std::string FormatCrc32c(absl::optional crc) { + if (!crc) return {}; + return storage::internal::Base64Encode( + google::cloud::internal::EncodeBigEndian(*crc)); +} + auto HandleFinishAfterError(std::string msg) { return [m = std::move(msg)](future f) { auto status = f.get(); From 006eedaee61419208c1350f7aac8bacca93628db Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 30 Jun 2026 10:14:44 +0000 Subject: [PATCH 8/8] fix(storage): prevent unsigned integer underflow during CRC32C rollback slice calculations --- .../cloud/storage/internal/async/writer_connection_resumed.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index cb10557a4737f..1f41cb4de578b 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -423,7 +423,7 @@ class AsyncWriterConnectionResumedState auto const y = upper->first; auto const crc_y = upper->second; hash_function_->RestoreCrc32c(crc_y, y); - if (y < persisted_size) { + if (y >= buffer_offset_ && y < persisted_size) { auto const slice_offset = static_cast(y - buffer_offset_); auto const slice_len = static_cast(persisted_size - y);