diff --git a/google/cloud/storage/async/writer.cc b/google/cloud/storage/async/writer.cc index 5195a55ef4f05..5f8a9b648db65 100644 --- a/google/cloud/storage/async/writer.cc +++ b/google/cloud/storage/async/writer.cc @@ -71,20 +71,31 @@ future> AsyncWriter::Write(AsyncToken token, future> AsyncWriter::Finalize( AsyncToken token, WritePayload payload) { + return Finalize(std::move(token), std::move(payload), absl::nullopt); +} + +future> AsyncWriter::Finalize( + AsyncToken token, WritePayload payload, + absl::optional const& expected_checksum) { if (!impl_) return StreamError(GCP_ERROR_INFO()); auto t = storage_internal::MakeAsyncToken(impl_.get()); if (token != t) { return TokenError(GCP_ERROR_INFO()); } - return impl_->Finalize(std::move(payload)).then([impl = impl_](auto f) { - return f.get(); - }); + return impl_->Finalize(std::move(payload), expected_checksum) + .then([impl = impl_](auto f) { return f.get(); }); } future> AsyncWriter::Finalize( AsyncToken token) { - return Finalize(std::move(token), WritePayload{}); + return Finalize(std::move(token), WritePayload{}, absl::nullopt); +} + +future> AsyncWriter::Finalize( + AsyncToken token, + absl::optional const& expected_checksum) { + return Finalize(std::move(token), WritePayload{}, expected_checksum); } future AsyncWriter::Flush() { diff --git a/google/cloud/storage/async/writer.h b/google/cloud/storage/async/writer.h index f39f8f5bee3db..52bfa8132b887 100644 --- a/google/cloud/storage/async/writer.h +++ b/google/cloud/storage/async/writer.h @@ -18,6 +18,7 @@ #include "google/cloud/storage/async/token.h" #include "google/cloud/storage/async/write_payload.h" #include "google/cloud/storage/async/writer_connection.h" +#include "google/cloud/storage/hashing_options.h" #include "google/cloud/future.h" #include "google/cloud/status_or.h" #include "google/cloud/version.h" @@ -116,12 +117,18 @@ class AsyncWriter { /// Finalize the upload with the existing data. future> Finalize(AsyncToken token); + future> Finalize( + AsyncToken token, + absl::optional const& expected_checksum); /** * Upload @p payload and then finalize the upload. */ future> Finalize(AsyncToken token, WritePayload payload); + future> Finalize( + AsyncToken token, WritePayload payload, + absl::optional const& expected_checksum); /** * Flush any buffered data to the service. diff --git a/google/cloud/storage/async/writer_connection.h b/google/cloud/storage/async/writer_connection.h index 3ab827623b99e..08ce48d07bd04 100644 --- a/google/cloud/storage/async/writer_connection.h +++ b/google/cloud/storage/async/writer_connection.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_WRITER_CONNECTION_H #include "google/cloud/storage/async/write_payload.h" +#include "google/cloud/storage/hashing_options.h" #include "google/cloud/storage/object_metadata.h" #include "google/cloud/future.h" #include "google/cloud/rpc_metadata.h" @@ -112,6 +113,11 @@ class AsyncWriterConnection { /// Finalizes an upload. virtual future> Finalize( WritePayload) = 0; + virtual future> Finalize( + WritePayload p, + absl::optional const& /*expected_checksum*/) { + return Finalize(std::move(p)); + } /// Uploads some data to the service and flushes the value. virtual future Flush(WritePayload payload) = 0; diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 303e420f91e86..faf72a3cc0f85 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -110,6 +110,35 @@ std::unique_ptr CreateHashFunction( return storage::internal::CreateNullHashFunction(); } +std::shared_ptr CreateAppendableHashFunction( + Options const& options) { + auto crc32c = std::unique_ptr(); + if (options.has()) { + crc32c = std::make_unique( + storage::internal::HashValues{ + Crc32cFromProto(options.get()), + /*.md5=*/{}}); + } else if (options.get()) { + crc32c = std::make_unique(); + } + + auto md5 = std::unique_ptr(); + if (options.has()) { + md5 = std::make_unique( + storage::internal::HashValues{ + /*.crc32=*/{}, + MD5FromProto(options.get())}); + } + + if (crc32c && md5) { + return std::make_shared( + std::move(crc32c), std::move(md5)); + } + if (crc32c) return crc32c; + if (md5) return md5; + return storage::internal::CreateNullHashFunction(); +} + StatusOr> MakeAppendableWriter( google::cloud::internal::ImmutableOptions const& current, google::storage::v2::BidiWriteObjectRequest request, @@ -125,19 +154,23 @@ StatusOr> MakeAppendableWriter( if (rpc->first_response.has_resource()) { auto const& resource = rpc->first_response.resource(); + hash = CreateAppendableHashFunction(*current); if (current->get() && resource.has_checksums() && resource.checksums().has_crc32c()) { - hash = std::make_shared< - ::google::cloud::storage::internal::Crc32cHashFunction>( - resource.checksums().crc32c(), resource.size()); - } else { - hash = CreateHashFunction(*current); + hash->RestoreCrc32c(resource.checksums().crc32c(), resource.size()); } impl = std::make_unique( current, request, std::move(rpc->stream), hash, resource, false); } else { persisted_size = rpc->first_response.persisted_size(); - hash = CreateHashFunction(*current); + hash = CreateAppendableHashFunction(*current); + if (current->get() && + rpc->first_response.has_persisted_data_checksums() && + rpc->first_response.persisted_data_checksums().has_crc32c()) { + hash->RestoreCrc32c( + rpc->first_response.persisted_data_checksums().crc32c(), + persisted_size); + } auto checksums = rpc->first_response.has_persisted_data_checksums() ? absl::make_optional( rpc->first_response.persisted_data_checksums()) diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index ef541d8f7b972..cb6244acf2a89 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -631,262 +631,6 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) { next.first.set_value(true); } -TEST_F(AsyncConnectionImplAppendableTest, - StartAppendableObjectUploadWithChecksum) { - auto constexpr kRequestText = R"pb( - write_object_spec { - resource { - bucket: "projects/_/buckets/test-bucket" - name: "test-object" - content_type: "text/plain" - } - } - )pb"; - AsyncSequencer sequencer; - auto mock = std::make_shared(); - - google::storage::v2::Object initial_resource; - initial_resource.set_bucket("projects/_/buckets/test-bucket"); - initial_resource.set_name("test-object"); - initial_resource.set_size(1024); - initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC - - auto stream = std::make_unique(); - EXPECT_CALL(*stream, Start).WillOnce([&] { - return sequencer.PushBack("Start"); - }); - - EXPECT_CALL(*stream, Read) - .WillOnce([&, initial_resource] { - return sequencer.PushBack("Read(Takeover)") - .then([initial_resource](auto) { - auto response = google::storage::v2::BidiWriteObjectResponse{}; - *response.mutable_resource() = initial_resource; - return absl::make_optional(std::move(response)); - }); - }) - .WillOnce([&, initial_resource] { - return sequencer.PushBack("Read(FinalObject)") - .then([initial_resource](auto) { - auto response = google::storage::v2::BidiWriteObjectResponse{}; - *response.mutable_resource() = initial_resource; - response.mutable_resource()->set_size( - initial_resource.size() + 9); // "some data" size is 9 - return absl::make_optional(std::move(response)); - }); - }); - - EXPECT_CALL(*stream, Cancel).Times(1); - EXPECT_CALL(*stream, Finish).WillOnce([&] { - return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); - }); - - EXPECT_CALL(*stream, Write) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.state_lookup()); - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(StateLookup)"); - }) - .WillOnce( - [&](google::storage::v2::BidiWriteObjectRequest const& /*request*/, - grpc::WriteOptions wopt) { - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(data)"); - }) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.finish_write()); - EXPECT_TRUE(wopt.is_last_message()); - // Here we expect full checksums to be set because we had the resource - // in takeover. - EXPECT_TRUE(request.has_object_checksums()); - auto expected_crc = - google::cloud::storage_internal::ExtendCrc32c(12345, "some data"); - EXPECT_EQ(request.object_checksums().crc32c(), expected_crc); - return sequencer.PushBack("Write(Finalize)"); - }); - - EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] { - return std::unique_ptr(std::move(stream)); - }); - - internal::AutomaticallyCreatedBackgroundThreads pool(1); - // Enable CRC32C validation in options - auto options = TestOptions().set(true); - auto connection = MakeTestConnection(pool.cq(), mock, options); - - auto request = google::storage::v2::BidiWriteObjectRequest{}; - ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); - request.mutable_write_object_spec()->set_appendable(true); - - auto pending = connection->StartAppendableObjectUpload( - {std::move(request), connection->options()}); - - auto next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Start"); - next.first.set_value(true); - - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(StateLookup)"); - next.first.set_value(true); - - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read(Takeover)"); - next.first.set_value(true); - - auto r = pending.get(); - ASSERT_STATUS_OK(r); - auto writer = *std::move(r); - - // Write some data. - auto w1 = writer->Write(storage::WritePayload("some data")); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(data)"); - next.first.set_value(true); - EXPECT_STATUS_OK(w1.get()); - - // Finalize the upload. - auto w2 = writer->Finalize({}); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(Finalize)"); - next.first.set_value(true); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read(FinalObject)"); - next.first.set_value(true); - - auto response = w2.get(); - ASSERT_STATUS_OK(response); - - writer.reset(); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Finish"); - next.first.set_value(true); -} - -// TODO(#16174): Figure out why this test fails to compile in MSVC. -#ifndef _WIN32 - -TEST_F(AsyncConnectionImplAppendableTest, - ResumeAppendableObjectUploadWithChecksum) { - auto constexpr kRequestText = R"pb( - append_object_spec { object: "test-object" } - )pb"; - AsyncSequencer sequencer; - auto mock = std::make_shared(); - - constexpr std::int64_t kPersistedSize = 16384; - constexpr std::uint32_t kPersistedCrc = 12345; - - auto stream = std::make_unique(); - EXPECT_CALL(*stream, Start).WillOnce([&] { - return sequencer.PushBack("Start"); - }); - - EXPECT_CALL(*stream, Read) - .WillOnce([&] { - return sequencer.PushBack("Read(PersistedSize)").then([](auto) { - auto response = google::storage::v2::BidiWriteObjectResponse{}; - response.set_persisted_size(kPersistedSize); - response.mutable_persisted_data_checksums()->set_crc32c( - kPersistedCrc); - return absl::make_optional(std::move(response)); - }); - }) - .WillOnce([&] { - return sequencer.PushBack("Read(FinalObject)").then([](auto) { - auto response = google::storage::v2::BidiWriteObjectResponse{}; - auto object = google::storage::v2::Object{}; - object.set_bucket("projects/_/buckets/test-bucket"); - object.set_name("test-object"); - object.set_size(kPersistedSize + 9); - *response.mutable_resource() = std::move(object); - return absl::make_optional(std::move(response)); - }); - }); - - EXPECT_CALL(*stream, Cancel).Times(1); - EXPECT_CALL(*stream, Finish).WillOnce([&] { - return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); - }); - - EXPECT_CALL(*stream, Write) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.state_lookup()); - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(StateLookup)"); - }) - .WillOnce( - [&](google::storage::v2::BidiWriteObjectRequest const& /*request*/, - grpc::WriteOptions wopt) { - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(data)"); - }) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.finish_write()); - EXPECT_TRUE(wopt.is_last_message()); - EXPECT_TRUE(request.has_object_checksums()); - EXPECT_EQ(request.object_checksums().crc32c(), 2901820631); - return sequencer.PushBack("Write(Finalize)"); - }); - - EXPECT_CALL(*mock, AsyncBidiWriteObject) - .WillOnce([&](auto const&, auto, auto) { - return std::unique_ptr(std::move(stream)); - }); - - internal::AutomaticallyCreatedBackgroundThreads pool(1); - auto options = TestOptions().set(true); - auto connection = MakeTestConnection(pool.cq(), mock, options); - - auto request = google::storage::v2::BidiWriteObjectRequest{}; - ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); - auto pending = connection->ResumeAppendableObjectUpload( - {std::move(request), connection->options()}); - - auto next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Start"); - next.first.set_value(true); - - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(StateLookup)"); - next.first.set_value(true); - - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read(PersistedSize)"); - next.first.set_value(true); - - auto r = pending.get(); - ASSERT_STATUS_OK(r); - auto writer = *std::move(r); - - auto w1 = writer->Write(storage::WritePayload("some data")); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(data)"); - next.first.set_value(true); - EXPECT_STATUS_OK(w1.get()); - - auto w2 = writer->Finalize({}); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Write(Finalize)"); - next.first.set_value(true); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read(FinalObject)"); - next.first.set_value(true); - - auto response = w2.get(); - ASSERT_STATUS_OK(response); - - writer.reset(); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Finish"); - next.first.set_value(true); -} - -#endif - } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/writer_connection_buffered.cc b/google/cloud/storage/internal/async/writer_connection_buffered.cc index d00ead1f2cf40..18713a73062da 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered.cc @@ -106,6 +106,11 @@ class AsyncWriterConnectionBufferedState return Impl(std::unique_lock(mu_))->PersistedState(); } + absl::optional PersistedChecksums() + const { + return Impl(std::unique_lock(mu_))->PersistedChecksums(); + } + future Write(storage::WritePayload const& p) { std::unique_lock lk(mu_); resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); @@ -114,7 +119,15 @@ class AsyncWriterConnectionBufferedState future> Finalize( storage::WritePayload const& p) { + return Finalize(p, absl::nullopt); + } + future> Finalize( + storage::WritePayload const& p, + absl::optional const& expected_checksum) { std::unique_lock lk(mu_); + if (expected_checksum.has_value()) { + expected_checksum_ = expected_checksum; + } resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); finalize_ = true; HandleNewData(std::move(lk), true); @@ -246,7 +259,7 @@ class AsyncWriterConnectionBufferedState auto impl = Impl(lk); lk.unlock(); // Finalize with an empty payload. - (void)impl->Finalize(storage::WritePayload{}) + (void)impl->Finalize(storage::WritePayload{}, expected_checksum_) .then([w = WeakFromThis()](auto f) { if (auto self = w.lock()) return self->OnFinalize(f.get()); }); @@ -636,6 +649,7 @@ class AsyncWriterConnectionBufferedState // Retrieve the future in the constructor, as some operations reset // finalized_. future> finalized_future_; + absl::optional expected_checksum_; // The result of calling `Close()`. Note that only one such call is ever // made. @@ -763,13 +777,24 @@ class AsyncWriterConnectionBuffered : public storage::AsyncWriterConnection { return state_->PersistedState(); } + absl::optional PersistedChecksums() + const override { + return state_->PersistedChecksums(); + } + future Write(storage::WritePayload p) override { return state_->Write(std::move(p)); } future> Finalize( storage::WritePayload p) override { - return state_->Finalize(std::move(p)); + return Finalize(std::move(p), absl::nullopt); + } + future> Finalize( + storage::WritePayload p, + absl::optional const& expected_checksum) + override { + return state_->Finalize(std::move(p), expected_checksum); } future Flush(storage::WritePayload p) override { diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index 92b3990b3c586..d3b0a23f939dd 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -16,9 +16,11 @@ #include "google/cloud/storage/internal/async/handle_redirect_error.h" #include "google/cloud/storage/internal/async/partial_upload.h" #include "google/cloud/storage/internal/async/write_payload_impl.h" +#include "google/cloud/storage/internal/base64.h" #include "google/cloud/storage/internal/grpc/ctype_cord_workaround.h" #include "google/cloud/storage/internal/grpc/object_metadata_parser.h" #include "google/cloud/storage/internal/hash_function_impl.h" +#include "google/cloud/internal/big_endian.h" #include "google/cloud/internal/make_status.h" namespace google { @@ -27,6 +29,12 @@ namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +std::string FormatCrc32c(absl::optional crc) { + if (!crc) return {}; + return storage::internal::Base64Encode( + google::cloud::internal::EncodeBigEndian(*crc)); +} + auto HandleFinishAfterError(std::string msg) { return [m = std::move(msg)](future f) { auto status = f.get(); @@ -140,27 +148,48 @@ future AsyncWriterConnectionImpl::Write(storage::WritePayload payload) { } future> -AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) { +AsyncWriterConnectionImpl::Finalize( + storage::WritePayload payload, + absl::optional const& expected_checksum) { auto write = MakeRequest(); write.set_finish_write(true); auto p = WritePayloadImpl::GetImpl(payload); auto size = p.size(); - auto action = PartialUpload::kFinalizeWithChecksum; - if (request_.has_append_object_spec() || - request_.write_object_spec().appendable()) { - if (!absl::holds_alternative( - persisted_state_) && - !persisted_data_checksums_.has_value()) { - action = PartialUpload::kFinalize; + + if (p.empty() && expected_checksum.has_value()) { + auto const actual = FormatCrc32c(hash_function_->CurrentCrc32c()); + if (!actual.empty() && expected_checksum->value() != actual) { + return make_ready_future(StatusOr( + google::cloud::internal::DataLossError( + "client checksum mismatch: expected " + + expected_checksum->value() + " got " + actual, + GCP_ERROR_INFO()))); } } + + auto action = request_.has_append_object_spec() || + request_.write_object_spec().appendable() + ? PartialUpload::kFinalize + : PartialUpload::kFinalizeWithChecksum; auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write), std::move(p), std::move(action)); - return coro->Start().then([coro, size, this](auto f) mutable { - coro.reset(); // breaks the cycle between the completion queue and coro - return OnFinalUpload(size, f.get()); - }); + return coro->Start().then( + [coro, size, expected_checksum, this](auto f) mutable { + coro.reset(); // breaks the cycle between the completion queue and coro + auto res = f.get(); + if (res.ok() && *res && expected_checksum.has_value()) { + auto const actual = FormatCrc32c(hash_function_->CurrentCrc32c()); + if (!actual.empty() && expected_checksum->value() != actual) { + return make_ready_future(StatusOr( + google::cloud::internal::DataLossError( + "client checksum mismatch: expected " + + expected_checksum->value() + " got " + actual, + GCP_ERROR_INFO()))); + } + } + return OnFinalUpload(size, std::move(res)); + }); } future AsyncWriterConnectionImpl::Flush(storage::WritePayload payload) { diff --git a/google/cloud/storage/internal/async/writer_connection_impl.h b/google/cloud/storage/internal/async/writer_connection_impl.h index cedfd1005bf48..3550a0cb1ebae 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.h +++ b/google/cloud/storage/internal/async/writer_connection_impl.h @@ -69,7 +69,13 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection { future Write(storage::WritePayload payload) override; future> Finalize( - storage::WritePayload) override; + storage::WritePayload payload) override { + return Finalize(std::move(payload), absl::nullopt); + } + future> Finalize( + storage::WritePayload payload, + absl::optional const& expected_checksum) + override; future Flush(storage::WritePayload payload) override; future Close(storage::WritePayload payload) override; future> Query() override; diff --git a/google/cloud/storage/internal/async/writer_connection_impl_test.cc b/google/cloud/storage/internal/async/writer_connection_impl_test.cc index 03ecf7699cc2c..52277bbdd4729 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl_test.cc @@ -904,6 +904,75 @@ TEST(AsyncWriterConnectionTest, CloseError) { EXPECT_THAT(response.get(), StatusIs(PermanentError().code())); } +TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchImmediate) { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(0x22620404)); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto response = tested->Finalize(WritePayload{}, + storage::Crc32cChecksumValue("AAAAAA==")); + EXPECT_THAT(response.get(), StatusIs(StatusCode::kDataLoss)); +} + +TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMatchImmediate) { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + EXPECT_CALL(*mock, Write).WillOnce([](Request const&, grpc::WriteOptions) { + return make_ready_future(true); + }); + EXPECT_CALL(*mock, Read).WillOnce([] { + return make_ready_future(absl::make_optional(MakeTestResponse())); + }); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(0x22620404)); + EXPECT_CALL(*hash, Finish) + .WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto response = tested->Finalize(WritePayload{}, + storage::Crc32cChecksumValue("ImIEBA==")); + EXPECT_THAT(response.get(), IsOk()); +} + +TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchOnComplete) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + EXPECT_CALL(*mock, Write) + .WillOnce([&](Request const& request, grpc::WriteOptions wopt) { + EXPECT_TRUE(request.finish_write()); + EXPECT_TRUE(wopt.is_last_message()); + return sequencer.PushBack("Write"); + }); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, Update(_, An(), _)).Times(1); + EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(0x22620404)); + EXPECT_CALL(*hash, Finish) + .WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""})); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto response = tested->Finalize(WritePayload(std::string(128, 'A')), + storage::Crc32cChecksumValue("AAAAAA==")); + auto next = sequencer.PopFrontWithName(); + ASSERT_THAT(next.second, "Write"); + next.first.set_value(true); + EXPECT_THAT(response.get(), StatusIs(StatusCode::kDataLoss)); +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 6d96e523a25d8..1f41cb4de578b 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -15,6 +15,7 @@ #include "google/cloud/storage/internal/async/writer_connection_resumed.h" #include "google/cloud/storage/internal/async/write_payload_impl.h" #include "google/cloud/storage/internal/async/writer_connection_impl.h" +#include "google/cloud/storage/internal/crc32c.h" #include "google/cloud/storage/internal/hash_function_impl.h" #include "google/cloud/future.h" #include "google/cloud/internal/make_status.h" @@ -23,6 +24,7 @@ #include "absl/strings/cord.h" #include #include +#include #include #include #include @@ -85,6 +87,11 @@ class AsyncWriterConnectionResumedState } else { buffer_offset_ = absl::get(state); } + if (hash_function_) { + if (auto crc = hash_function_->CurrentCrc32c()) { + crc32c_history_[buffer_offset_] = *crc; + } + } if (first_response_.has_write_handle()) { latest_write_handle_ = first_response_.write_handle(); } else if (initial_request_.has_append_object_spec() && @@ -116,6 +123,11 @@ class AsyncWriterConnectionResumedState return Impl(std::unique_lock(mu_))->PersistedState(); } + absl::optional PersistedChecksums() + const { + return Impl(std::unique_lock(mu_))->PersistedChecksums(); + } + future Write(storage::WritePayload const& p) { std::unique_lock lk(mu_); resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); @@ -124,7 +136,15 @@ class AsyncWriterConnectionResumedState future> Finalize( storage::WritePayload const& p) { + return Finalize(p, absl::nullopt); + } + future> Finalize( + storage::WritePayload const& p, + absl::optional const& expected_checksum) { std::unique_lock lk(mu_); + if (expected_checksum.has_value()) { + expected_checksum_ = expected_checksum; + } resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); finalize_ = true; HandleNewData(std::move(lk)); @@ -255,13 +275,24 @@ class AsyncWriterConnectionResumedState // If another thread initiated FinalizeStep concurrently, just return. return; } + if (expected_checksum_.has_value() && hash_function_) { + auto const actual = hash_function_->Finish().crc32c; + if (!actual.empty() && expected_checksum_->value() != actual) { + SetError(std::move(lk), + google::cloud::internal::DataLossError( + "client checksum mismatch: expected " + + expected_checksum_->value() + " got " + actual, + GCP_ERROR_INFO())); + return; + } + } // Mark that we are starting the finalization process. state_ = State::kWriting; finalizing_ = true; auto impl = Impl(lk); lk.unlock(); // Finalize with an empty payload. - (void)impl->Finalize(storage::WritePayload{}) + (void)impl->Finalize(storage::WritePayload{}, expected_checksum_) .then([w = WeakFromThis()](auto f) { if (auto self = w.lock()) return self->OnFinalize(f.get()); }); @@ -288,7 +319,23 @@ class AsyncWriterConnectionResumedState void OnClose(Status result) { if (!result.ok()) return Resume(std::move(result)); - SetClosed(std::unique_lock(mu_), std::move(result)); + std::unique_lock lk(mu_); + auto checksums = Impl(lk)->PersistedChecksums(); + if (checksums && checksums->has_crc32c()) { + auto const state = Impl(lk)->PersistedState(); + auto const persisted_size = + absl::holds_alternative(state) + ? absl::get(state).size() + : absl::get(state); + auto it = crc32c_history_.find(persisted_size); + if (it != crc32c_history_.end() && it->second != checksums->crc32c()) { + SetError(std::move(lk), google::cloud::internal::DataLossError( + "client/server checksum mismatch at Close", + GCP_ERROR_INFO())); + return; + } + } + SetClosed(std::move(lk), std::move(result)); } void FlushStep(std::unique_lock lk, absl::Cord payload) { @@ -308,12 +355,39 @@ class AsyncWriterConnectionResumedState if (!result.ok()) return Resume(std::move(result)); std::unique_lock lk(mu_); write_offset_ += write_size; + if (hash_function_) { + if (auto crc = hash_function_->CurrentCrc32c()) { + crc32c_history_[buffer_offset_ + write_offset_] = *crc; + } + } auto impl = Impl(lk); lk.unlock(); impl->Query().then([result, w = WeakFromThis()](auto f) { auto self = w.lock(); if (!self) return; - self->OnQuery(f.get()); + auto query_res = f.get(); + if (!query_res.ok()) { + self->OnQuery(std::move(query_res)); + self->SetFlushed(std::unique_lock(self->mu_), + std::move(result)); + return; + } + auto const persisted_size = *query_res; + std::unique_lock lk(self->mu_); + auto checksums = self->Impl(lk)->PersistedChecksums(); + if (checksums && checksums->has_crc32c()) { + auto it = self->crc32c_history_.find(persisted_size); + if (it != self->crc32c_history_.end() && + it->second != checksums->crc32c()) { + self->SetError(std::move(lk), + google::cloud::internal::DataLossError( + "client/server checksum mismatch at Flush", + GCP_ERROR_INFO())); + return; + } + } + lk.unlock(); + self->OnQuery(std::move(query_res)); self->SetFlushed(std::unique_lock(self->mu_), std::move(result)); }); @@ -337,8 +411,40 @@ class AsyncWriterConnectionResumedState return tmp; } + void RestoreChecksumState(std::int64_t persisted_size) { + if (!hash_function_ || !hash_function_->CurrentCrc32c().has_value()) return; + auto it = crc32c_history_.find(persisted_size); + if (it != crc32c_history_.end()) { + hash_function_->RestoreCrc32c(it->second, persisted_size); + } else if (!crc32c_history_.empty()) { + auto upper = crc32c_history_.upper_bound(persisted_size); + if (upper != crc32c_history_.begin()) { + --upper; + auto const y = upper->first; + auto const crc_y = upper->second; + hash_function_->RestoreCrc32c(crc_y, y); + if (y >= buffer_offset_ && y < persisted_size) { + auto const slice_offset = + static_cast(y - buffer_offset_); + auto const slice_len = static_cast(persisted_size - y); + if (slice_offset + slice_len <= resend_buffer_.size()) { + auto slice = resend_buffer_.Subcord(slice_offset, slice_len); + (void)hash_function_->Update(y, slice, Crc32c(slice)); + } + } + if (auto current = hash_function_->CurrentCrc32c()) { + crc32c_history_[persisted_size] = *current; + } + } + } + auto purge_it = crc32c_history_.upper_bound(persisted_size); + crc32c_history_.erase(purge_it, crc32c_history_.end()); + crc32c_history_.erase(crc32c_history_.begin(), + crc32c_history_.lower_bound(persisted_size)); + } + void OnQuery(std::unique_lock lk, std::int64_t persisted_size) { - auto handle = impl_->WriteHandle(); + auto handle = Impl(lk)->WriteHandle(); if (handle) { latest_write_handle_ = *std::move(handle); } @@ -354,6 +460,7 @@ class AsyncWriterConnectionResumedState MakeFastForwardError(buffer_offset_, persisted_size, GCP_ERROR_INFO())); } + RestoreChecksumState(persisted_size); resend_buffer_.RemovePrefix(static_cast(n)); buffer_offset_ = persisted_size; if (state_ == State::kResuming) { @@ -397,6 +504,9 @@ class AsyncWriterConnectionResumedState if (!result.ok()) return Resume(std::move(result)); std::unique_lock lk(mu_); write_offset_ += write_size; + if (auto crc = hash_function_->CurrentCrc32c()) { + crc32c_history_[buffer_offset_ + write_offset_] = *crc; + } state_ = State::kIdle; return StartWriting(std::move(lk)); } @@ -479,14 +589,14 @@ class AsyncWriterConnectionResumedState checksums = first_res.persisted_data_checksums(); } } else { - auto state = impl_->PersistedState(); + auto state = Impl(lk)->PersistedState(); if (absl::holds_alternative(state)) { finalized = true; finalized_object = absl::get(std::move(state)); } else { persisted_offset = absl::get(state); - checksums = impl_->PersistedChecksums(); + checksums = Impl(lk)->PersistedChecksums(); } } @@ -508,15 +618,8 @@ class AsyncWriterConnectionResumedState } // Recreate the underlying stream if still active. - auto hash = hash_function_; - if (checksums && checksums->has_crc32c()) { - hash = std::make_shared< - ::google::cloud::storage::internal::Crc32cHashFunction>( - checksums->crc32c(), persisted_offset); - } - impl_ = std::make_unique( - options_, initial_request_, std::move(res->stream), std::move(hash), + options_, initial_request_, std::move(res->stream), hash_function_, persisted_offset, false, checksums); // OnQuery will restart the WriteLoop if necessary. OnQuery(std::move(lk), persisted_offset); @@ -699,6 +802,8 @@ class AsyncWriterConnectionResumedState // Retrieve the future in the constructor, as some operations reset // finalized_. future> finalized_future_; + absl::optional expected_checksum_; + std::map crc32c_history_; // The result of calling `Close()`. Note that only one such call is ever // made. @@ -836,13 +941,24 @@ class AsyncWriterConnectionResumed : public storage::AsyncWriterConnection { return state_->PersistedState(); } + absl::optional PersistedChecksums() + const override { + return state_->PersistedChecksums(); + } + future Write(storage::WritePayload p) override { return state_->Write(std::move(p)); } future> Finalize( storage::WritePayload p) override { - return state_->Finalize(std::move(p)); + return state_->Finalize(std::move(p), absl::nullopt); + } + future> Finalize( + storage::WritePayload p, + absl::optional const& expected_checksum) + override { + return state_->Finalize(std::move(p), expected_checksum); } future Flush(storage::WritePayload p) override { diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index e510ddad65e3c..fe00d804003b8 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -32,6 +32,7 @@ namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +using ::google::cloud::storage::testing::MockHashFunction; using ::google::cloud::storage::testing::canonical_errors::PermanentError; using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::storage_mocks::MockAsyncWriterConnection; @@ -40,6 +41,8 @@ using ::google::cloud::testing_util::IsOkAndHolds; using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::StatusIs; using ::testing::_; +using ::testing::An; +using ::testing::AnyNumber; using ::testing::Eq; using ::testing::ResultOf; using ::testing::Return; @@ -1198,6 +1201,120 @@ TEST(WriterConnectionResumed, CloseFailsAndResumeSucceedsAndFinalized) { EXPECT_STATUS_OK(close.get()); } +TEST(WriterConnectionResumed, FinalizeExpectedChecksumMismatch) { + auto mock = std::make_unique(); + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, CurrentCrc32c).WillRepeatedly(Return(100)); + EXPECT_CALL(*hash, Finish) + .WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""})); + MockFactory mock_factory; + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, hash, + first_response, Options{}); + auto finalize = + connection->Finalize({}, storage::Crc32cChecksumValue("AAAAAA==")); + EXPECT_THAT(finalize.get(), StatusIs(StatusCode::kDataLoss)); +} + +TEST(WriterConnectionResumed, RollbackChecksumOnResume) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const&) { + return sequencer.PushBack("Flush").then([](auto f) { + if (!f.get()) return TransientError(); + return Status{}; + }); + }); + + auto hash = std::make_shared(); + EXPECT_CALL(*hash, CurrentCrc32c) + .WillOnce(Return(100)) + .WillOnce(Return(200)) + .WillRepeatedly(Return(200)); + EXPECT_CALL(*hash, Update(_, An(), _)).Times(AnyNumber()); + EXPECT_CALL(*hash, RestoreCrc32c(_, _)).Times(AnyNumber()); + + MockFactory mock_factory; + auto mock_stream = + std::make_unique>(); + auto* mock_stream_ptr = mock_stream.get(); + + google::storage::v2::BidiWriteObjectResponse query_resp1; + query_resp1.set_persisted_size(10); + google::storage::v2::BidiWriteObjectResponse query_resp2; + query_resp2.set_persisted_size(20); + EXPECT_CALL(mock_factory, Call(_)).WillOnce([&](auto const&) { + WriteObject::WriteResult result; + result.stream = std::move(mock_stream); + return sequencer.PushBack("Factory").then( + [r = std::move(result)](auto) mutable { + return StatusOr(std::move(r)); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Write).WillRepeatedly([&](auto const&, auto) { + return sequencer.PushBack("StreamWrite").then([](auto) { return true; }); + }); + EXPECT_CALL(*mock_stream_ptr, Read) + .WillOnce([&, query_resp1]() { + return sequencer.PushBack("StreamRead").then([query_resp1](auto) { + return absl::make_optional(query_resp1); + }); + }) + .WillRepeatedly([&, query_resp2]() { + return sequencer.PushBack("StreamRead").then([query_resp2](auto) { + return absl::make_optional(query_resp2); + }); + }); + EXPECT_CALL(*mock_stream_ptr, Cancel).Times(AnyNumber()); + EXPECT_CALL(*mock_stream_ptr, Finish).WillRepeatedly([] { + return make_ready_future(Status{}); + }); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, hash, + first_response, Options{}); + auto write = connection->Write(TestPayload(20)); + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(false); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Factory"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead"); + next.first.set_value(true); + + EXPECT_STATUS_OK(write.get()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/writer_connection_tracing.cc b/google/cloud/storage/internal/async/writer_connection_tracing.cc index 96ad896a0369f..897e4c895944c 100644 --- a/google/cloud/storage/internal/async/writer_connection_tracing.cc +++ b/google/cloud/storage/internal/async/writer_connection_tracing.cc @@ -60,6 +60,11 @@ class AsyncWriterConnectionTracing : public storage::AsyncWriterConnection { return impl_->PersistedState(); } + absl::optional PersistedChecksums() + const override { + return impl_->PersistedChecksums(); + } + future Write(storage::WritePayload p) override { internal::OTelScope scope(span_); auto size = static_cast(p.size()); @@ -81,9 +86,16 @@ class AsyncWriterConnectionTracing : public storage::AsyncWriterConnection { future> Finalize( storage::WritePayload p) override { + return Finalize(std::move(p), absl::nullopt); + } + + future> Finalize( + storage::WritePayload p, + absl::optional const& expected_checksum) + override { internal::OTelScope scope(span_); auto size = static_cast(p.size()); - return impl_->Finalize(std::move(p)) + return impl_->Finalize(std::move(p), expected_checksum) .then([count = ++sent_count_, span = span_, size](auto f) { span->AddEvent( "gl-cpp.finalize", diff --git a/google/cloud/storage/internal/hash_function.h b/google/cloud/storage/internal/hash_function.h index da7699353739f..90038fad26578 100644 --- a/google/cloud/storage/internal/hash_function.h +++ b/google/cloud/storage/internal/hash_function.h @@ -21,6 +21,7 @@ #include "google/cloud/status.h" #include "absl/strings/cord.h" #include "absl/strings/string_view.h" +#include "absl/types/optional.h" #include #include @@ -79,6 +80,12 @@ class HashFunction { virtual Status Update(std::int64_t offset, absl::Cord const& buffer, std::uint32_t buffer_crc) = 0; + virtual absl::optional CurrentCrc32c() const { + return absl::nullopt; + } + virtual void RestoreCrc32c(std::uint32_t /*crc32c*/, + std::int64_t /*offset*/) {} + /** * Compute the final hash values. */ diff --git a/google/cloud/storage/internal/hash_function_impl.h b/google/cloud/storage/internal/hash_function_impl.h index 6b028d3cf0bab..ff3e0a0d6be0c 100644 --- a/google/cloud/storage/internal/hash_function_impl.h +++ b/google/cloud/storage/internal/hash_function_impl.h @@ -62,6 +62,15 @@ class CompositeFunction : public HashFunction { std::uint32_t buffer_crc) override; Status Update(std::int64_t offset, absl::Cord const& buffer, std::uint32_t buffer_crc) override; + absl::optional CurrentCrc32c() const override { + auto c = a_->CurrentCrc32c(); + if (c.has_value()) return c; + return b_->CurrentCrc32c(); + } + void RestoreCrc32c(std::uint32_t crc32c, std::int64_t offset) override { + a_->RestoreCrc32c(crc32c, offset); + b_->RestoreCrc32c(crc32c, offset); + } HashValues Finish() override; private: @@ -120,6 +129,13 @@ class Crc32cHashFunction : public HashFunction { std::uint32_t buffer_crc) override; Status Update(std::int64_t offset, absl::Cord const& buffer, std::uint32_t buffer_crc) override; + absl::optional CurrentCrc32c() const override { + return current_; + } + void RestoreCrc32c(std::uint32_t crc32c, std::int64_t offset) override { + current_ = crc32c; + minimum_offset_ = offset; + } HashValues Finish() override; private: diff --git a/google/cloud/storage/testing/mock_hash_function.h b/google/cloud/storage/testing/mock_hash_function.h index e749019f8093a..90a4d741d8515 100644 --- a/google/cloud/storage/testing/mock_hash_function.h +++ b/google/cloud/storage/testing/mock_hash_function.h @@ -33,6 +33,9 @@ class MockHashFunction : public storage::internal::HashFunction { (override)); MOCK_METHOD(Status, Update, (std::int64_t, absl::Cord const&, std::uint32_t), (override)); + MOCK_METHOD(absl::optional, CurrentCrc32c, (), + (const, override)); + MOCK_METHOD(void, RestoreCrc32c, (std::uint32_t, std::int64_t), (override)); MOCK_METHOD(storage::internal::HashValues, Finish, (), (override)); };