Skip to content

Commit a0ddfb7

Browse files
authored
CXXCBC-757: Replica get/lookup_in - add sub-op spans & parent span (#866)
1 parent 00ef6e6 commit a0ddfb7

File tree

6 files changed

+369
-70
lines changed

6 files changed

+369
-70
lines changed

core/impl/collection.cxx

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
308308
},
309309
options.timeout,
310310
options.read_preference,
311+
span,
311312
};
312313
return core_.execute(
313314
std::move(request),
@@ -340,6 +341,7 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
340341
},
341342
options.timeout,
342343
options.read_preference,
344+
span,
343345
};
344346
return core_.execute(
345347
std::move(request),
@@ -380,7 +382,7 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
380382
options.durability_level,
381383
options.timeout,
382384
{ options.retry_strategy },
383-
options.parent_span,
385+
span,
384386
};
385387
return core_.execute(
386388
std::move(request),
@@ -405,7 +407,7 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
405407
durability_level::none,
406408
options.timeout,
407409
{ options.retry_strategy },
408-
options.parent_span,
410+
span,
409411
};
410412
return core_.execute(
411413
std::move(request),
@@ -464,7 +466,7 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
464466
static_cast<uint32_t>(lock_duration.count()),
465467
options.timeout,
466468
{ options.retry_strategy },
467-
options.parent_span,
469+
span,
468470
};
469471
core_.execute(
470472
std::move(request),
@@ -500,7 +502,7 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
500502
cas,
501503
options.timeout,
502504
{ options.retry_strategy },
503-
options.parent_span,
505+
span,
504506
};
505507
core_.execute(std::move(request),
506508
[span = std::move(span), handler = std::move(handler)](auto&& resp) mutable {
@@ -530,7 +532,7 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
530532
{},
531533
options.timeout,
532534
{ options.retry_strategy },
533-
options.parent_span,
535+
span,
534536
};
535537
core_.execute(
536538
std::move(request),

core/operations/document_get_all_replicas.hxx

Lines changed: 73 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "core/operations/document_get.hxx"
2727
#include "core/operations/operation_traits.hxx"
2828
#include "core/public_fwd.hxx"
29+
#include "core/tracing/constants.hxx"
2930
#include "core/utils/movable_function.hxx"
3031

3132
#include <memory>
@@ -67,6 +68,7 @@ struct get_all_replicas_request {
6768
id = id,
6869
timeout = timeout,
6970
read_preference = read_preference,
71+
parent_span = std::move(parent_span),
7072
h = std::forward<Handler>(handler)](
7173
std::error_code ec, std::shared_ptr<topology::configuration> config) mutable {
7274
if (ec) {
@@ -107,11 +109,40 @@ struct get_all_replicas_request {
107109
auto ctx = std::make_shared<replica_context>(std::move(h), nodes.size());
108110

109111
for (const auto& node : nodes) {
112+
auto subop_span = core->tracer()->create_span(
113+
node.is_replica ? tracing::operation::mcbp_get_replica : tracing::operation::mcbp_get,
114+
parent_span);
115+
116+
if (subop_span->uses_tags()) {
117+
subop_span->add_tag(tracing::attributes::op::service, tracing::service::key_value);
118+
subop_span->add_tag(tracing::attributes::op::operation_name,
119+
node.is_replica ? tracing::operation::mcbp_get_replica
120+
: tracing::operation::mcbp_get);
121+
subop_span->add_tag(tracing::attributes::op::bucket_name, id.bucket());
122+
subop_span->add_tag(tracing::attributes::op::scope_name, id.scope());
123+
subop_span->add_tag(tracing::attributes::op::collection_name, id.collection());
124+
}
125+
110126
if (node.is_replica) {
111127
document_id replica_id{ id };
112128
replica_id.node_index(node.index);
113129
core->execute(
114-
impl::get_replica_request{ std::move(replica_id), timeout }, [ctx](auto&& resp) {
130+
impl::get_replica_request{
131+
std::move(replica_id),
132+
timeout,
133+
{},
134+
{},
135+
{},
136+
subop_span,
137+
},
138+
[ctx, subop_span](auto&& resp) {
139+
{
140+
if (subop_span->uses_tags() && resp.ctx.retry_attempts() > 0) {
141+
subop_span->add_tag(tracing::attributes::op::retry_count,
142+
resp.ctx.retry_attempts());
143+
}
144+
subop_span->end();
145+
}
115146
handler_type local_handler{};
116147
{
117148
std::scoped_lock lock(ctx->mutex_);
@@ -142,36 +173,52 @@ struct get_all_replicas_request {
142173
}
143174
});
144175
} else {
145-
core->execute(get_request{ document_id{ id }, {}, {}, timeout }, [ctx](auto&& resp) {
146-
handler_type local_handler{};
147-
{
148-
std::scoped_lock lock(ctx->mutex_);
149-
if (ctx->done_) {
150-
return;
176+
core->execute(
177+
get_request{
178+
document_id{ id },
179+
{},
180+
{},
181+
timeout,
182+
{},
183+
subop_span,
184+
},
185+
[ctx, subop_span](auto&& resp) {
186+
{
187+
if (subop_span->uses_tags() && resp.ctx.retry_attempts() > 0) {
188+
subop_span->add_tag(tracing::attributes::op::retry_count,
189+
resp.ctx.retry_attempts());
190+
}
191+
subop_span->end();
151192
}
152-
--ctx->expected_responses_;
153-
if (resp.ctx.ec()) {
154-
if (ctx->expected_responses_ > 0) {
155-
// just ignore the response
193+
handler_type local_handler{};
194+
{
195+
std::scoped_lock lock(ctx->mutex_);
196+
if (ctx->done_) {
156197
return;
157198
}
158-
} else {
159-
ctx->result_.emplace_back(get_all_replicas_response::entry{
160-
std::move(resp.value), resp.cas, resp.flags, false /* active */ });
161-
}
162-
if (ctx->expected_responses_ == 0) {
163-
ctx->done_ = true;
164-
std::swap(local_handler, ctx->handler_);
199+
--ctx->expected_responses_;
200+
if (resp.ctx.ec()) {
201+
if (ctx->expected_responses_ > 0) {
202+
// just ignore the response
203+
return;
204+
}
205+
} else {
206+
ctx->result_.emplace_back(get_all_replicas_response::entry{
207+
std::move(resp.value), resp.cas, resp.flags, false /* active */ });
208+
}
209+
if (ctx->expected_responses_ == 0) {
210+
ctx->done_ = true;
211+
std::swap(local_handler, ctx->handler_);
212+
}
165213
}
166-
}
167-
if (local_handler) {
168-
if (ctx->result_.empty()) {
169-
// Return an error only when we have no results from any replica.
170-
return local_handler({ std::move(resp.ctx), {} });
214+
if (local_handler) {
215+
if (ctx->result_.empty()) {
216+
// Return an error only when we have no results from any replica.
217+
return local_handler({ std::move(resp.ctx), {} });
218+
}
219+
return local_handler({ {}, std::move(ctx->result_) });
171220
}
172-
return local_handler({ {}, std::move(ctx->result_) });
173-
}
174-
});
221+
});
175222
}
176223
}
177224
});

core/operations/document_get_any_replica.hxx

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ struct get_any_replica_request {
5252
core::document_id id;
5353
std::optional<std::chrono::milliseconds> timeout{};
5454
couchbase::read_preference read_preference{ couchbase::read_preference::no_preference };
55+
std::shared_ptr<couchbase::tracing::request_span> parent_span{ nullptr };
5556

5657
template<typename Core, typename Handler>
5758
void execute(Core core, Handler handler)
@@ -62,6 +63,7 @@ struct get_any_replica_request {
6263
id = id,
6364
timeout = timeout,
6465
read_preference = read_preference,
66+
parent_span = std::move(parent_span),
6567
h = std::forward<Handler>(handler)](
6668
std::error_code ec, std::shared_ptr<topology::configuration> config) mutable {
6769
const auto [e, origin] = core->origin();
@@ -118,18 +120,43 @@ struct get_any_replica_request {
118120
auto ctx = std::make_shared<replica_context>(std::move(h), nodes.size());
119121

120122
for (const auto& node : nodes) {
123+
auto subop_span = core->tracer()->create_span(
124+
node.is_replica ? tracing::operation::mcbp_get_replica : tracing::operation::mcbp_get,
125+
parent_span);
126+
127+
if (subop_span->uses_tags()) {
128+
subop_span->add_tag(tracing::attributes::op::service, tracing::service::key_value);
129+
subop_span->add_tag(tracing::attributes::op::operation_name,
130+
node.is_replica ? tracing::operation::mcbp_get_replica
131+
: tracing::operation::mcbp_get);
132+
subop_span->add_tag(tracing::attributes::op::bucket_name, id.bucket());
133+
subop_span->add_tag(tracing::attributes::op::scope_name, id.scope());
134+
subop_span->add_tag(tracing::attributes::op::collection_name, id.collection());
135+
}
136+
121137
if (node.is_replica) {
122138
document_id replica_id{ id };
123139
replica_id.node_index(node.index);
124140
impl::with_cancellation<impl::get_replica_request> req{
125141
{
126142
std::move(replica_id),
127143
timeout,
144+
{},
145+
{},
146+
{},
147+
subop_span,
128148
},
129149
};
130150
ctx->add_cancellation_token(req.cancel_token);
131151
core->execute(
132-
std::move(req), [ctx](auto&& resp) {
152+
std::move(req), [ctx, subop_span](auto&& resp) {
153+
{
154+
if (subop_span->uses_tags() && resp.ctx.retry_attempts() > 0) {
155+
subop_span->add_tag(tracing::attributes::op::retry_count,
156+
resp.ctx.retry_attempts());
157+
}
158+
subop_span->end();
159+
}
133160
handler_type local_handler;
134161
std::vector<std::shared_ptr<impl::cancellation_token>> cancel_tokens;
135162
{
@@ -165,10 +192,19 @@ struct get_any_replica_request {
165192
{},
166193
{},
167194
timeout,
195+
{},
196+
subop_span,
168197
},
169198
};
170199
ctx->add_cancellation_token(req.cancel_token);
171-
core->execute(std::move(req), [ctx](auto&& resp) {
200+
core->execute(std::move(req), [ctx, subop_span](auto&& resp) {
201+
{
202+
if (subop_span->uses_tags() && resp.ctx.retry_attempts() > 0) {
203+
subop_span->add_tag(tracing::attributes::op::retry_count,
204+
resp.ctx.retry_attempts());
205+
}
206+
subop_span->end();
207+
}
172208
handler_type local_handler{};
173209
std::vector<std::shared_ptr<impl::cancellation_token>> cancel_tokens;
174210
{

core/operations/document_lookup_in_all_replicas.hxx

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "core/impl/subdoc/command.hxx"
2424
#include "core/operations/document_lookup_in.hxx"
2525
#include "core/operations/operation_traits.hxx"
26+
#include "core/tracing/constants.hxx"
2627
#include "core/utils/movable_function.hxx"
2728

2829
#include <couchbase/codec/encoded_value.hxx>
@@ -152,14 +153,39 @@ struct lookup_in_all_replicas_request {
152153
auto ctx = std::make_shared<replica_context>(std::move(h), nodes.size());
153154

154155
for (const auto& node : nodes) {
156+
auto subop_span = core->tracer()->create_span(
157+
node.is_replica ? tracing::operation::mcbp_lookup_in_replica
158+
: tracing::operation::mcbp_lookup_in,
159+
parent_span);
160+
161+
if (subop_span->uses_tags()) {
162+
subop_span->add_tag(tracing::attributes::op::service, tracing::service::key_value);
163+
subop_span->add_tag(tracing::attributes::op::operation_name,
164+
node.is_replica ? tracing::operation::mcbp_lookup_in_replica
165+
: tracing::operation::mcbp_lookup_in);
166+
subop_span->add_tag(tracing::attributes::op::bucket_name, id.bucket());
167+
subop_span->add_tag(tracing::attributes::op::scope_name, id.scope());
168+
subop_span->add_tag(tracing::attributes::op::collection_name, id.collection());
169+
}
170+
155171
if (node.is_replica) {
156172
document_id replica_id{ id };
157173
replica_id.node_index(node.index);
158174
auto replica_req = impl::lookup_in_replica_request{
159-
std::move(replica_id), specs, timeout, parent_span
175+
std::move(replica_id),
176+
specs,
177+
timeout,
178+
subop_span,
160179
};
161180
replica_req.access_deleted = access_deleted;
162-
core->execute(replica_req, [ctx](auto&& resp) {
181+
core->execute(replica_req, [ctx, subop_span](auto&& resp) {
182+
{
183+
if (subop_span->uses_tags() && resp.ctx.retry_attempts() > 0) {
184+
subop_span->add_tag(tracing::attributes::op::retry_count,
185+
resp.ctx.retry_attempts());
186+
}
187+
subop_span->end();
188+
}
163189
handler_type local_handler{};
164190
{
165191
std::scoped_lock lock(ctx->mutex_);
@@ -206,8 +232,24 @@ struct lookup_in_all_replicas_request {
206232
});
207233
} else {
208234
core->execute(
209-
lookup_in_request{ document_id{ id }, {}, {}, false, specs, timeout },
210-
[ctx](auto&& resp) {
235+
lookup_in_request{
236+
document_id{ id },
237+
{},
238+
{},
239+
false,
240+
specs,
241+
timeout,
242+
{},
243+
subop_span,
244+
},
245+
[ctx, subop_span](auto&& resp) {
246+
{
247+
if (subop_span->uses_tags() && resp.ctx.retry_attempts() > 0) {
248+
subop_span->add_tag(tracing::attributes::op::retry_count,
249+
resp.ctx.retry_attempts());
250+
}
251+
subop_span->end();
252+
}
211253
handler_type local_handler{};
212254
{
213255
std::scoped_lock lock(ctx->mutex_);

0 commit comments

Comments
 (0)