Skip to content
Open
19 changes: 15 additions & 4 deletions google/cloud/storage/async/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,31 @@ future<StatusOr<AsyncToken>> AsyncWriter::Write(AsyncToken token,

future<StatusOr<google::storage::v2::Object>> AsyncWriter::Finalize(
AsyncToken token, WritePayload payload) {
return Finalize(std::move(token), std::move(payload), absl::nullopt);
}

future<StatusOr<google::storage::v2::Object>> AsyncWriter::Finalize(
AsyncToken token, WritePayload payload,
absl::optional<Crc32cChecksumValue> const& expected_checksum) {
if (!impl_) return StreamError<google::storage::v2::Object>(GCP_ERROR_INFO());
auto t = storage_internal::MakeAsyncToken(impl_.get());
if (token != t) {
return TokenError<google::storage::v2::Object>(GCP_ERROR_INFO());
}

return impl_->Finalize(std::move(payload)).then([impl = impl_](auto f) {
return f.get();
});
return impl_->Finalize(std::move(payload), expected_checksum)
.then([impl = impl_](auto f) { return f.get(); });
}

future<StatusOr<google::storage::v2::Object>> AsyncWriter::Finalize(
AsyncToken token) {
return Finalize(std::move(token), WritePayload{});
return Finalize(std::move(token), WritePayload{}, absl::nullopt);
}

future<StatusOr<google::storage::v2::Object>> AsyncWriter::Finalize(
AsyncToken token,
absl::optional<Crc32cChecksumValue> const& expected_checksum) {
return Finalize(std::move(token), WritePayload{}, expected_checksum);
}

future<Status> AsyncWriter::Flush() {
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/storage/async/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "google/cloud/storage/async/token.h"
#include "google/cloud/storage/async/write_payload.h"
#include "google/cloud/storage/async/writer_connection.h"
#include "google/cloud/storage/hashing_options.h"
#include "google/cloud/future.h"
#include "google/cloud/status_or.h"
#include "google/cloud/version.h"
Expand Down Expand Up @@ -116,12 +117,18 @@ class AsyncWriter {

/// Finalize the upload with the existing data.
future<StatusOr<google::storage::v2::Object>> Finalize(AsyncToken token);
future<StatusOr<google::storage::v2::Object>> Finalize(
AsyncToken token,
absl::optional<Crc32cChecksumValue> const& expected_checksum);

/**
* Upload @p payload and then finalize the upload.
*/
future<StatusOr<google::storage::v2::Object>> Finalize(AsyncToken token,
WritePayload payload);
future<StatusOr<google::storage::v2::Object>> Finalize(
AsyncToken token, WritePayload payload,
absl::optional<Crc32cChecksumValue> const& expected_checksum);

/**
* Flush any buffered data to the service.
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/storage/async/writer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_WRITER_CONNECTION_H

#include "google/cloud/storage/async/write_payload.h"
#include "google/cloud/storage/hashing_options.h"
#include "google/cloud/storage/object_metadata.h"
#include "google/cloud/future.h"
#include "google/cloud/rpc_metadata.h"
Expand Down Expand Up @@ -112,6 +113,11 @@ class AsyncWriterConnection {
/// Finalizes an upload.
virtual future<StatusOr<google::storage::v2::Object>> Finalize(
WritePayload) = 0;
virtual future<StatusOr<google::storage::v2::Object>> Finalize(
WritePayload p,
absl::optional<Crc32cChecksumValue> const& /*expected_checksum*/) {
return Finalize(std::move(p));
}

/// Uploads some data to the service and flushes the value.
virtual future<Status> Flush(WritePayload payload) = 0;
Expand Down
45 changes: 39 additions & 6 deletions google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,35 @@ std::unique_ptr<storage::internal::HashFunction> CreateHashFunction(
return storage::internal::CreateNullHashFunction();
}

std::shared_ptr<storage::internal::HashFunction> CreateAppendableHashFunction(
Options const& options) {
auto crc32c = std::unique_ptr<storage::internal::HashFunction>();
if (options.has<storage::UseCrc32cValueOption>()) {
crc32c = std::make_unique<storage::internal::PrecomputedHashFunction>(
storage::internal::HashValues{
Crc32cFromProto(options.get<storage::UseCrc32cValueOption>()),
/*.md5=*/{}});
} else if (options.get<storage::EnableCrc32cValidationOption>()) {
crc32c = std::make_unique<storage::internal::Crc32cHashFunction>();
}

auto md5 = std::unique_ptr<storage::internal::HashFunction>();
if (options.has<storage::UseMD5ValueOption>()) {
md5 = std::make_unique<storage::internal::PrecomputedHashFunction>(
storage::internal::HashValues{
/*.crc32=*/{},
MD5FromProto(options.get<storage::UseMD5ValueOption>())});
}

if (crc32c && md5) {
return std::make_shared<storage::internal::CompositeFunction>(
std::move(crc32c), std::move(md5));
}
if (crc32c) return crc32c;
if (md5) return md5;
return storage::internal::CreateNullHashFunction();
}

StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> MakeAppendableWriter(
google::cloud::internal::ImmutableOptions const& current,
google::storage::v2::BidiWriteObjectRequest request,
Expand All @@ -125,19 +154,23 @@ StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> MakeAppendableWriter(

if (rpc->first_response.has_resource()) {
auto const& resource = rpc->first_response.resource();
hash = CreateAppendableHashFunction(*current);
if (current->get<storage::EnableCrc32cValidationOption>() &&
resource.has_checksums() && resource.checksums().has_crc32c()) {
hash = std::make_shared<
::google::cloud::storage::internal::Crc32cHashFunction>(
resource.checksums().crc32c(), resource.size());
} else {
hash = CreateHashFunction(*current);
hash->RestoreCrc32c(resource.checksums().crc32c(), resource.size());
}
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash, resource, false);
} else {
persisted_size = rpc->first_response.persisted_size();
hash = CreateHashFunction(*current);
hash = CreateAppendableHashFunction(*current);
if (current->get<storage::EnableCrc32cValidationOption>() &&
rpc->first_response.has_persisted_data_checksums() &&
rpc->first_response.persisted_data_checksums().has_crc32c()) {
hash->RestoreCrc32c(
rpc->first_response.persisted_data_checksums().crc32c(),
persisted_size);
}
auto checksums = rpc->first_response.has_persisted_data_checksums()
? absl::make_optional(
rpc->first_response.persisted_data_checksums())
Expand Down
Loading
Loading