@@ -670,15 +670,21 @@ attempt_context_impl::replace(const transaction_get_result& document,
670670 self, " found existing INSERT of {} while replacing" , document);
671671 self->create_staged_insert (document.id (),
672672 std::move (content),
673- existing_sm->doc (). cas ().value (),
673+ existing_sm->cas ().value (),
674674 exp_delay (std::chrono::milliseconds (5 ),
675675 std::chrono::milliseconds (300 ),
676676 self->overall ()->config ().timeout ),
677677 op_id,
678678 std::move (cb));
679679 return ;
680680 }
681- self->create_staged_replace (document, std::move (content), op_id, std::move (cb));
681+ self->create_staged_replace (document.id (),
682+ std::move (content),
683+ document.content ().flags ,
684+ document.cas (),
685+ op_id,
686+ document.metadata (),
687+ std::move (cb));
682688 });
683689 });
684690 } catch (const client_error& e) {
@@ -714,15 +720,19 @@ external_exception_from_response(const Response& resp) -> external_exception
714720
715721template <typename Handler>
716722void
717- attempt_context_impl::create_staged_replace (const transaction_get_result& document,
718- codec::encoded_value content,
719- const std::string& op_id,
720- Handler&& cb)
723+ attempt_context_impl::create_staged_replace (
724+ const document_id& id,
725+ codec::encoded_value content,
726+ std::uint32_t original_flags,
727+ const couchbase::cas& cas,
728+ const std::string& op_id,
729+ const std::optional<document_metadata>& document_metadata,
730+ Handler&& cb)
721731{
722- operations::mutate_in_request req{ document. id () };
732+ operations::mutate_in_request req{ id };
723733 const bool binary =
724734 codec::codec_flags::has_common_flags (content.flags , codec::codec_flags::binary_common_flags);
725- auto txn = create_document_metadata (" replace" , op_id, document. metadata () , content.flags );
735+ auto txn = create_document_metadata (" replace" , op_id, document_metadata , content.flags );
726736 req.specs =
727737 mutate_in_specs{
728738 mutate_in_specs::upsert_raw (" txn" , utils::to_binary (jsonify (txn))).xattr ().create_path (),
@@ -735,8 +745,8 @@ attempt_context_impl::create_staged_replace(const transaction_get_result& docume
735745 }
736746 .specs ();
737747 req.durability_level = overall ()->config ().level ;
738- req.cas = document. cas () ;
739- req.flags = document. content (). flags ;
748+ req.cas = cas;
749+ req.flags = original_flags ;
740750 req.access_deleted = true ;
741751 auto error_handler = [self = shared_from_this ()](error_class ec,
742752 external_exception cause,
@@ -757,25 +767,26 @@ attempt_context_impl::create_staged_replace(const transaction_get_result& docume
757767 return self->op_completed_with_error (std::forward<Handler>(cb), err);
758768 }
759769 };
760- auto ec =
761- wait_for_hook ([self = shared_from_this (), key = document.id ().key ()](auto handler) mutable {
762- return self->hooks_ .before_staged_replace (self, key, std::move (handler));
763- });
770+ auto ec = wait_for_hook ([self = shared_from_this (), key = id.key ()](auto handler) mutable {
771+ return self->hooks_ .before_staged_replace (self, key, std::move (handler));
772+ });
764773 if (ec) {
765774 return error_handler (
766775 *ec, UNKNOWN, " before_staged_replace hook raised error" , std::forward<Handler>(cb));
767776 }
768777 CB_ATTEMPT_CTX_LOG_TRACE (this ,
769778 " about to replace doc {} with cas {} in txn {}" ,
770- document. id () ,
771- document. cas () .value (),
779+ id ,
780+ cas.value (),
772781 overall ()->transaction_id ());
773782 overall ()->cluster_ref ().execute (
774783 req,
775784 [self = shared_from_this (),
776- operation_id = op_id,
777- document,
785+ op_id,
786+ id,
787+ document_metadata,
778788 content = std::move (content),
789+ original_flags,
779790 cb = std::forward<Handler>(cb),
780791 error_handler = std::move (error_handler)](core::operations::mutate_in_response resp) mutable {
781792 if (auto ec2 = error_class_from_response (resp); ec2) {
@@ -787,11 +798,13 @@ attempt_context_impl::create_staged_replace(const transaction_get_result& docume
787798 }
788799 return self->hooks_ .after_staged_replace_complete (
789800 self,
790- document. id () .key (),
801+ id .key (),
791802 [self,
792- operation_id,
793- document,
803+ op_id,
804+ id,
805+ document_metadata,
794806 content = std::move (content),
807+ original_flags,
795808 error_handler = std::move (error_handler),
796809 cb = std::forward<Handler>(cb),
797810 resp = std::move (resp)](auto ec) mutable {
@@ -812,17 +825,17 @@ attempt_context_impl::create_staged_replace(const transaction_get_result& docume
812825 staged_content_binary = content;
813826 }
814827 transaction_get_result out{
815- document. id () ,
828+ id ,
816829 content,
817830 resp.cas .value (),
818831 transaction_links{
819832 self->atr_id_ ->key (),
820- document. id () .bucket (),
821- document. id () .scope (),
822- document. id () .collection (),
833+ id .bucket (),
834+ id .scope (),
835+ id .collection (),
823836 self->overall ()->transaction_id (),
824837 self->id (),
825- operation_id ,
838+ op_id ,
826839 std::move (staged_content_json),
827840 std::move (staged_content_binary),
828841 std::nullopt ,
@@ -833,17 +846,17 @@ attempt_context_impl::create_staged_replace(const transaction_get_result& docume
833846 std::nullopt ,
834847 false ,
835848 },
836- document. metadata () ,
849+ document_metadata ,
837850 };
838851
839852 CB_ATTEMPT_CTX_LOG_TRACE (self, " replace staged content, result {}" , out);
840853
841854 self->supports_replace_body_with_xattr (
842- document .bucket (),
855+ id .bucket (),
843856 [self,
844857 out = std::move (out),
845858 error_handler = std::move (error_handler),
846- current_flags = document. content (). flags ,
859+ original_flags ,
847860 cb = std::forward<Handler>(cb)](auto ec, bool supports) mutable {
848861 if (ec) {
849862 return error_handler (
@@ -853,17 +866,19 @@ attempt_context_impl::create_staged_replace(const transaction_get_result& docume
853866 std::forward<Handler>(cb));
854867 }
855868
869+ auto [staged_content, staged_flags] = out.links ().staged_content_json_or_binary ();
870+
856871 self->staged_mutations_ ->add (staged_mutation{
857- out,
858- (supports)
859- ? std::nullopt
860- : std::make_optional (
861- out.links ()
862- .staged_content_json_or_binary ()), // We don't store the staged contents if
863- // the cluster supports
864- // replace_body_with_xattr
865872 staged_mutation_type::REPLACE,
866- current_flags,
873+ out.id (),
874+ out.cas (),
875+ supports ? std::nullopt
876+ : std::make_optional (staged_content), // We don't store the staged contents
877+ // if the cluster supports
878+ // replace_body_with_xattr
879+ staged_flags,
880+ original_flags,
881+ out.metadata (),
867882 });
868883 return self->op_completed_with_callback (std::forward<Handler>(cb),
869884 std::optional (out));
@@ -1002,8 +1017,13 @@ attempt_context_impl::insert(const core::document_id& id,
10021017 }
10031018 if (existing_sm != nullptr && existing_sm->type () == staged_mutation_type::REMOVE) {
10041019 CB_ATTEMPT_CTX_LOG_DEBUG (self, " found existing remove of {} while inserting" , id);
1005- return self->create_staged_replace (
1006- existing_sm->doc (), std::move (content), op_id, std::move (cb));
1020+ return self->create_staged_replace (existing_sm->id (),
1021+ std::move (content),
1022+ existing_sm->current_user_flags (),
1023+ existing_sm->cas (),
1024+ op_id,
1025+ existing_sm->doc_metadata (),
1026+ std::move (cb));
10071027 }
10081028 const std::uint64_t cas = 0 ;
10091029 self->create_staged_insert (id,
@@ -1273,11 +1293,16 @@ attempt_context_impl::remove(const transaction_get_result& document, VoidCallbac
12731293 document.id (),
12741294 resp.cas .value (),
12751295 resp.ctx .ec ().message ());
1276- // TODO(SA): this copy... can we do better?
1277- transaction_get_result new_res = document;
1278- new_res.cas (resp.cas .value ());
1279- self->staged_mutations_ ->add (staged_mutation (
1280- new_res, {}, staged_mutation_type::REMOVE, document.content ().flags ));
1296+
1297+ self->staged_mutations_ ->add (staged_mutation{
1298+ staged_mutation_type::REMOVE,
1299+ document.id (),
1300+ resp.cas ,
1301+ {},
1302+ document.content ().flags ,
1303+ document.content ().flags ,
1304+ document.metadata (),
1305+ });
12811306 return self->op_completed_with_callback (cb);
12821307 });
12831308 });
@@ -1434,11 +1459,11 @@ attempt_context_impl::query_begin_work(const std::optional<std::string>& query_c
14341459 if (!staged_mutations_->empty ()) {
14351460 staged_mutations_->iterate([&mutations](staged_mutation& mut) {
14361461 mutations.push_back (tao::json::value{
1437- { " scp" , mut.doc (). id ().scope () },
1438- { " coll" , mut.doc (). id ().collection () },
1439- { " bkt" , mut.doc (). id ().bucket () },
1440- { " id" , mut.doc (). id ().key () },
1441- { " cas" , std::to_string (mut.doc (). cas ().value ()) },
1462+ { " scp" , mut.id ().scope () },
1463+ { " coll" , mut.id ().collection () },
1464+ { " bkt" , mut.id ().bucket () },
1465+ { " id" , mut.id ().key () },
1466+ { " cas" , std::to_string (mut.cas ().value ()) },
14421467 { " type" , mut.type_as_string () },
14431468 });
14441469 });
@@ -2966,11 +2991,18 @@ attempt_context_impl::do_get(const core::document_id& id,
29662991 // Check if we already have a staged insert/replace for this document AND we have the content
29672992 // for it (i.e. the cluster does not support replace body_with_xattr)
29682993 if (const staged_mutation* own_write = check_for_own_write (id); own_write != nullptr ) {
2969- if (auto own_write_content = own_write->content (); own_write_content.has_value ()) {
2994+ const auto own_write_content = own_write->staged_content ();
2995+ if (own_write_content.has_value ()) {
29702996 CB_ATTEMPT_CTX_LOG_DEBUG (this , " found own-write of mutated doc {}" , id);
29712997 return cb (std::nullopt ,
29722998 std::nullopt ,
2973- transaction_get_result::create_from (own_write->doc (), own_write_content.value ()));
2999+ transaction_get_result{
3000+ own_write->id (),
3001+ codec::encoded_value{ own_write_content.value (), own_write->staged_flags () },
3002+ own_write->cas ().value (),
3003+ {},
3004+ {},
3005+ });
29743006 }
29753007 }
29763008 if (const staged_mutation* own_remove = staged_mutations_->find_remove (id);
@@ -3332,8 +3364,15 @@ attempt_context_impl::create_staged_insert_error_handler(const core::document_id
33323364 // this is us dealing with resolving an ambiguity. So, lets
33333365 // just update the staged_mutation with the correct cas and
33343366 // continue...
3335- self->staged_mutations_ ->add (staged_mutation (
3336- *doc, content, staged_mutation_type::INSERT, content.flags ));
3367+ self->staged_mutations_ ->add (staged_mutation{
3368+ staged_mutation_type::INSERT,
3369+ doc->id (),
3370+ doc->cas (),
3371+ content.data ,
3372+ content.flags ,
3373+ doc->content ().flags ,
3374+ doc->metadata (),
3375+ });
33373376 return self->op_completed_with_callback (std::forward<Handler>(cb), doc);
33383377 }
33393378 return self->op_completed_with_error (
@@ -3557,17 +3596,19 @@ attempt_context_impl::create_staged_insert(const core::document_id& id,
35573596 std::nullopt ,
35583597 };
35593598
3599+ auto [staged_content, staged_flags] = out.links ().staged_content_json_or_binary ();
3600+
35603601 self->staged_mutations_ ->add (staged_mutation{
3561- out,
3562- supports
3563- ? std::nullopt
3564- : std::make_optional (
3565- out.links ()
3566- .staged_content_json_or_binary ()), // We don't store the staged contents if
3567- // the cluster supports
3568- // replace_body_with_xattr
35693602 staged_mutation_type::INSERT,
3570- out.links ().staged_content_json_or_binary ().flags ,
3603+ id,
3604+ resp.cas ,
3605+ supports ? std::nullopt
3606+ : std::make_optional (staged_content), // We don't store the staged contents
3607+ // if the cluster supports
3608+ // replace_body_with_xattr
3609+ staged_flags,
3610+ staged_flags,
3611+ out.metadata (),
35713612 });
35723613 return self->op_completed_with_callback (std::forward<Handler>(cb),
35733614 std::optional{ std::move (out) });
0 commit comments