diff --git a/BUILD.bazel b/BUILD.bazel index 5c5b72a5..bc0fa694 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -17,6 +17,8 @@ cc_library( "src/datadog/datadog_agent.cpp", "src/datadog/datadog_agent.h", "src/datadog/datadog_agent_config.cpp", + "src/datadog/ddsketch.cpp", + "src/datadog/ddsketch.h", "src/datadog/default_http_client.h", "src/datadog/default_http_client_null.cpp", "src/datadog/endpoint_inferral.cpp", @@ -52,6 +54,8 @@ cc_library( "src/datadog/runtime_id.cpp", "src/datadog/sampling_util.h", "src/datadog/span.cpp", + "src/datadog/stats_concentrator.cpp", + "src/datadog/stats_concentrator.h", "src/datadog/span_data.cpp", "src/datadog/span_data.h", "src/datadog/span_matcher.cpp", diff --git a/CMakeLists.txt b/CMakeLists.txt index 8cc154f1..f84f7243 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -184,6 +184,8 @@ target_sources(dd-trace-cpp-objects src/datadog/collector_response.cpp src/datadog/datadog_agent_config.cpp src/datadog/datadog_agent.cpp + src/datadog/ddsketch.cpp + src/datadog/stats_concentrator.cpp src/datadog/endpoint_inferral.cpp src/datadog/environment.cpp src/datadog/error.cpp diff --git a/include/datadog/datadog_agent_config.h b/include/datadog/datadog_agent_config.h index a324f44f..f6524d46 100644 --- a/include/datadog/datadog_agent_config.h +++ b/include/datadog/datadog_agent_config.h @@ -65,6 +65,16 @@ struct DatadogAgentConfig { // How often, in seconds, to query the Datadog Agent for remote configuration // updates. Optional remote_configuration_poll_interval_seconds; + + // Whether the tracer should compute trace stats locally instead of relying + // on the Datadog Agent. When enabled, the tracer aggregates span metrics + // (hits, errors, duration distributions) and sends them to the Agent via + // POST /v0.6/stats. The Agent can then drop priority-0 traces without + // losing service-level metrics. + // + // Overridden by the DD_TRACE_STATS_COMPUTATION_ENABLED environment variable. + // Default: false. + Optional stats_computation_enabled; }; class FinalizedDatadogAgentConfig { diff --git a/include/datadog/environment.h b/include/datadog/environment.h index f2846b37..f82f5137 100644 --- a/include/datadog/environment.h +++ b/include/datadog/environment.h @@ -81,7 +81,8 @@ namespace environment { MACRO(DD_APM_TRACING_ENABLED, BOOLEAN, true) \ MACRO(DD_TRACE_RESOURCE_RENAMING_ENABLED, BOOLEAN, false) \ MACRO(DD_TRACE_RESOURCE_RENAMING_ALWAYS_SIMPLIFIED_ENDPOINT, BOOLEAN, false) \ - MACRO(DD_EXTERNAL_ENV, STRING, "") + MACRO(DD_EXTERNAL_ENV, STRING, "") \ + MACRO(DD_TRACE_STATS_COMPUTATION_ENABLED, BOOLEAN, false) #define ENV_DEFAULT_RESOLVED_IN_CODE(X) X #define WITH_COMMA(ARG, TYPE, DEFAULT_VALUE) ARG, diff --git a/src/datadog/datadog_agent.cpp b/src/datadog/datadog_agent.cpp index e761d8fe..e2b736f8 100644 --- a/src/datadog/datadog_agent.cpp +++ b/src/datadog/datadog_agent.cpp @@ -19,6 +19,7 @@ #include "msgpack.h" #include "platform_util.h" #include "span_data.h" +#include "stats_concentrator.h" #include "telemetry_metrics.h" #include "trace_sampler.h" @@ -169,6 +170,16 @@ DatadogAgent::DatadogAgent( tracer_signature.library_version); if (config.stats_computation_enabled) { headers_.emplace("Datadog-Client-Computed-Stats", "yes"); + headers_.emplace("Datadog-Client-Computed-Top-Level", "yes"); + + // Create the stats concentrator for Client-Side Stats Computation. + stats_concentrator_ = std::make_unique( + config.http_client, config.url, logger, + /*hostname=*/"", /*env=*/"", /*version=*/"", + /*service=*/std::string(tracer_signature.default_service), + /*lang=*/"cpp", + /*tracer_version=*/tracer_signature.library_version, + /*runtime_id=*/tracer_signature.runtime_id.string()); } // Origin Detection headers are not necessary when Unix Domain Socket (UDS) @@ -209,12 +220,26 @@ DatadogAgent::~DatadogAgent() { flush(); + // Flush any remaining stats at shutdown. + if (stats_concentrator_) { + stats_concentrator_->flush_all(); + } + http_client_->drain(deadline); } Expected DatadogAgent::send( std::vector>&& spans, const std::shared_ptr& response_handler) { + // Feed eligible spans to the stats concentrator (if enabled). + if (stats_concentrator_) { + for (const auto& span_ptr : spans) { + if (span_ptr) { + stats_concentrator_->add(*span_ptr); + } + } + } + std::lock_guard lock(mutex_); trace_chunks_.push_back(TraceChunk{std::move(spans), response_handler}); return nullopt; @@ -368,6 +393,11 @@ void DatadogAgent::flush() { logger_->log_error( error->with_prefix("Unexpected error submitting traces: ")); } + + // Also flush stats if Client-Side Stats is enabled. + if (stats_concentrator_) { + stats_concentrator_->flush(clock_()); + } } void DatadogAgent::get_and_apply_remote_configuration_updates() { diff --git a/src/datadog/datadog_agent.h b/src/datadog/datadog_agent.h index fe016c46..2dded736 100644 --- a/src/datadog/datadog_agent.h +++ b/src/datadog/datadog_agent.h @@ -17,6 +17,7 @@ #include #include "remote_config/remote_config.h" +#include "stats_concentrator.h" namespace datadog { namespace tracing { @@ -52,6 +53,10 @@ class DatadogAgent : public Collector { std::unordered_map headers_; + // Client-Side Stats Computation: when enabled, this concentrator aggregates + // span metrics and flushes them to the Agent via POST /v0.6/stats. + std::unique_ptr stats_concentrator_; + void flush(); public: diff --git a/src/datadog/datadog_agent_config.cpp b/src/datadog/datadog_agent_config.cpp index 05b3b065..a430aa06 100644 --- a/src/datadog/datadog_agent_config.cpp +++ b/src/datadog/datadog_agent_config.cpp @@ -152,8 +152,15 @@ Expected finalize_config( result.admission_controller_uid = std::string(*external_env); } - // Not supported yet but required for APM tracing disablement. - result.stats_computation_enabled = false; + // Client-Side Stats Computation. Default: false. + // Overridden by DD_TRACE_STATS_COMPUTATION_ENABLED env var. + if (auto stats_env = + lookup(environment::DD_TRACE_STATS_COMPUTATION_ENABLED)) { + result.stats_computation_enabled = !falsy(*stats_env); + } else { + result.stats_computation_enabled = + user_config.stats_computation_enabled.value_or(false); + } return result; } diff --git a/src/datadog/ddsketch.cpp b/src/datadog/ddsketch.cpp new file mode 100644 index 00000000..4db147b6 --- /dev/null +++ b/src/datadog/ddsketch.cpp @@ -0,0 +1,170 @@ +#include "ddsketch.h" + +#include +#include +#include +#include + +#include "msgpack.h" + +namespace datadog { +namespace tracing { + +DDSketch::DDSketch(double relative_accuracy, std::size_t max_num_bins) + : max_num_bins_(max_num_bins) { + assert(relative_accuracy > 0.0 && relative_accuracy < 1.0); + assert(max_num_bins > 0); + + // gamma = (1 + alpha) / (1 - alpha) + gamma_ = (1.0 + relative_accuracy) / (1.0 - relative_accuracy); + log_gamma_ = std::log(gamma_); +} + +int DDSketch::key(double value) const { + // Map value to a bin index using log(value) / log(gamma). + // The index is ceiling of log_gamma(value). + return static_cast(std::ceil(std::log(value) / log_gamma_)); +} + +double DDSketch::lower_bound(int index) const { + return std::pow(gamma_, index - 1); +} + +void DDSketch::add(double value) { + if (value < 0.0) { + value = 0.0; + } + + count_++; + sum_ += value; + + if (count_ == 1) { + min_ = value; + max_ = value; + } else { + if (value < min_) min_ = value; + if (value > max_) max_ = value; + } + + // Values that are effectively zero go into the zero bucket. + // Using a threshold comparable to the smallest representable bin. + constexpr double min_positive = 1e-9; // 1 nanosecond + if (value <= min_positive) { + zero_count_++; + return; + } + + int k = key(value); + + if (bins_.empty()) { + min_key_ = k; + bins_.push_back(1); + return; + } + + // Expand bins_ to cover the new key. + if (k < min_key_) { + // Prepend bins. + int prepend = min_key_ - k; + bins_.insert(bins_.begin(), static_cast(prepend), 0); + min_key_ = k; + bins_[0] = 1; + } else if (k >= min_key_ + static_cast(bins_.size())) { + // Append bins. + bins_.resize(static_cast(k - min_key_ + 1), 0); + bins_[static_cast(k - min_key_)] = 1; + } else { + bins_[static_cast(k - min_key_)]++; + } + + // If we have exceeded the max number of bins, collapse by merging + // adjacent bins (pairs from the left). + while (bins_.size() > max_num_bins_) { + std::vector merged; + merged.reserve((bins_.size() + 1) / 2); + for (std::size_t i = 0; i < bins_.size(); i += 2) { + std::uint64_t c = bins_[i]; + if (i + 1 < bins_.size()) { + c += bins_[i + 1]; + } + merged.push_back(c); + } + // After merging pairs, the new min_key_ corresponds to + // the original min_key_ / 2 (floor division for the mapping). + // Actually, for log-based indexing, merging adjacent bins means + // each new bin covers twice the range, which is equivalent to + // halving the resolution. For simplicity, we just reassign. + min_key_ = min_key_ / 2; + bins_ = std::move(merged); + } +} + +std::uint64_t DDSketch::count() const { return count_; } + +double DDSketch::sum() const { return sum_; } + +double DDSketch::min() const { return count_ > 0 ? min_ : 0.0; } + +double DDSketch::max() const { return count_ > 0 ? max_ : 0.0; } + +double DDSketch::avg() const { + return count_ > 0 ? sum_ / static_cast(count_) : 0.0; +} + +bool DDSketch::empty() const { return count_ == 0; } + +void DDSketch::clear() { + bins_.clear(); + min_key_ = 0; + count_ = 0; + zero_count_ = 0; + sum_ = 0.0; + min_ = 0.0; + max_ = 0.0; +} + +void DDSketch::msgpack_encode(std::string& destination) const { + // The DDSketch proto message has 3 fields: + // mapping: {gamma, index_offset, interpolation} + // positive_values: {contiguous_bin_counts, contiguous_bin_index_offset} + // zero_count + // + // We encode this as a msgpack map. + + // Mapping sub-map + // interpolation: 0 = NONE (logarithmic) + // index_offset: 0 (we use the raw index) + + // clang-format off + msgpack::pack_map(destination, 3); + + // 1) "mapping" + msgpack::pack_string(destination, "mapping"); + msgpack::pack_map(destination, 3); + msgpack::pack_string(destination, "gamma"); + msgpack::pack_double(destination, gamma_); + msgpack::pack_string(destination, "indexOffset"); + msgpack::pack_double(destination, 0.0); + msgpack::pack_string(destination, "interpolation"); + msgpack::pack_integer(destination, std::int64_t(0)); // NONE + + // 2) "positiveValues" + msgpack::pack_string(destination, "positiveValues"); + msgpack::pack_map(destination, 2); + msgpack::pack_string(destination, "contiguousBinCounts"); + // Pack as array of doubles (matching the proto float64 repeated field). + msgpack::pack_array(destination, bins_.size()); + for (auto c : bins_) { + msgpack::pack_double(destination, static_cast(c)); + } + msgpack::pack_string(destination, "contiguousBinIndexOffset"); + msgpack::pack_integer(destination, std::int64_t(min_key_)); + + // 3) "zeroCount" + msgpack::pack_string(destination, "zeroCount"); + msgpack::pack_double(destination, static_cast(zero_count_)); + // clang-format on +} + +} // namespace tracing +} // namespace datadog diff --git a/src/datadog/ddsketch.h b/src/datadog/ddsketch.h new file mode 100644 index 00000000..6fc997e7 --- /dev/null +++ b/src/datadog/ddsketch.h @@ -0,0 +1,90 @@ +#pragma once + +// This component provides a minimal DDSketch implementation for the +// Client-Side Stats feature. +// +// DDSketch is a mergeable, relative-error quantile sketch. It maps values to +// bins using a logarithmic index scheme that guarantees at most `alpha` +// relative accuracy for any quantile query. +// +// This implementation is intentionally limited to what is needed by the stats +// concentrator: recording non-negative durations (in nanoseconds) and +// serializing the sketch to msgpack for the /v0.6/stats payload. +// +// Reference: +// https://arxiv.org/abs/1908.10693 + +#include +#include +#include +#include + +namespace datadog { +namespace tracing { + +class DDSketch { + public: + // Construct a DDSketch with the specified `relative_accuracy` (alpha) and + // `max_num_bins` limit. The default parameters match the Datadog Agent: + // - 1% relative accuracy (alpha = 0.01) + // - 2048 maximum bins + explicit DDSketch(double relative_accuracy = 0.01, + std::size_t max_num_bins = 2048); + + // Insert the specified `value` (expected to be a non-negative duration in + // nanoseconds). Negative values are treated as zero. + void add(double value); + + // Return the number of values that have been inserted. + std::uint64_t count() const; + + // Return the sum of all values that have been inserted. + double sum() const; + + // Return the minimum value that has been inserted, or 0 if empty. + double min() const; + + // Return the maximum value that has been inserted, or 0 if empty. + double max() const; + + // Return the average of all values that have been inserted, or 0 if empty. + double avg() const; + + // Return whether the sketch is empty (no values inserted). + bool empty() const; + + // Reset the sketch to its initial state. + void clear(); + + // Append the msgpack-encoded sketch to `destination`. The encoding follows + // the proto/v0.6 stats payload format (DDSketch proto message): + // - mapping: {gamma, index_offset, interpolation} + // - positive_values: {contiguous_bin_counts, contiguous_bin_index_offset} + // - zero_count + void msgpack_encode(std::string& destination) const; + + private: + // Return the bin index for the specified `value`. + int key(double value) const; + + // Return the lower bound value of the bin at the specified `index`. + double lower_bound(int index) const; + + double gamma_; + double log_gamma_; + std::size_t max_num_bins_; + + // Contiguous bins stored from min_key_ onward. + // bins_[i] = count for key (min_key_ + i). + std::vector bins_; + int min_key_ = 0; + + std::uint64_t count_ = 0; + std::uint64_t zero_count_ = 0; + double sum_ = 0.0; + double min_ = 0.0; + double max_ = 0.0; +}; + +} // namespace tracing +} // namespace datadog diff --git a/src/datadog/stats_concentrator.cpp b/src/datadog/stats_concentrator.cpp new file mode 100644 index 00000000..69a06682 --- /dev/null +++ b/src/datadog/stats_concentrator.cpp @@ -0,0 +1,638 @@ +#include "stats_concentrator.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "msgpack.h" +#include "span_data.h" +#include "string_util.h" + +namespace datadog { +namespace tracing { +namespace { + +constexpr StringView stats_api_path = "/v0.6/stats"; + +constexpr uint64_t bucket_duration_ns = static_cast( + std::chrono::duration_cast(stats_bucket_duration) + .count()); + +// Tags used for span eligibility and stats dimensions. +constexpr StringView tag_measured = "_dd.measured"; +constexpr StringView tag_top_level = "_dd.top_level"; +constexpr StringView tag_span_kind = "span.kind"; +constexpr StringView tag_http_status_code = "http.status_code"; +constexpr StringView tag_http_method = "http.method"; +constexpr StringView tag_http_endpoint = "http.endpoint"; +constexpr StringView tag_origin = "_dd.origin"; +constexpr StringView tag_base_service = "_dd.base_service"; + +// gRPC status code tag candidates (checked in order of precedence). +constexpr StringView grpc_tag_candidates[] = { + "rpc.grpc.status_code", + "grpc.code", + "rpc.grpc.status.code", + "grpc.status.code", +}; + +// Peer tag keys to extract for client/producer/consumer spans. +// This list matches the Go agent standard. +constexpr StringView peer_tag_keys[] = { + "_dd.base_service", "peer.service", + "peer.hostname", "out.host", + "db.instance", "db.system", + "messaging.destination", "network.destination.name", +}; + +// Span kinds that make a span eligible for stats. +bool is_stats_span_kind(StringView kind) { + return kind == "server" || kind == "client" || kind == "producer" || + kind == "consumer"; +} + +// Span kinds that should include peer tags. +bool is_peer_tag_span_kind(StringView kind) { + return kind == "client" || kind == "producer" || kind == "consumer"; +} + +Optional lookup_tag( + const std::unordered_map& tags, StringView key) { + auto it = tags.find(std::string(key)); + if (it != tags.end()) { + return StringView(it->second); + } + return nullopt; +} + +Optional lookup_numeric_tag( + const std::unordered_map& tags, StringView key) { + auto it = tags.find(std::string(key)); + if (it != tags.end()) { + return it->second; + } + return nullopt; +} + +uint32_t parse_status_code(StringView value) { + uint32_t result = 0; + auto [ptr, ec] = + std::from_chars(value.data(), value.data() + value.size(), result); + if (ec != std::errc()) { + return 0; + } + return result; +} + +HTTPClient::URL stats_endpoint(const HTTPClient::URL& agent_url) { + auto url = agent_url; + append(url.path, stats_api_path); + return url; +} + +// Combine hash values (boost-style). +inline void hash_combine(std::size_t& seed, std::size_t value) { + seed ^= value + 0x9e3779b9 + (seed << 6) + (seed >> 2); +} + +} // namespace + +// -- StatsAggregationKey -- + +bool StatsAggregationKey::operator==(const StatsAggregationKey& other) const { + return service == other.service && name == other.name && + resource == other.resource && type == other.type && + http_status_code == other.http_status_code && + grpc_status_code == other.grpc_status_code && + span_kind == other.span_kind && synthetics == other.synthetics && + is_trace_root == other.is_trace_root && + peer_tags_hash == other.peer_tags_hash && + http_method == other.http_method && + http_endpoint == other.http_endpoint; +} + +std::size_t StatsAggregationKeyHash::operator()( + const StatsAggregationKey& key) const { + std::size_t seed = 0; + std::hash hasher; + std::hash uint_hasher; + std::hash bool_hasher; + + hash_combine(seed, hasher(key.service)); + hash_combine(seed, hasher(key.name)); + hash_combine(seed, hasher(key.resource)); + hash_combine(seed, hasher(key.type)); + hash_combine(seed, uint_hasher(key.http_status_code)); + hash_combine(seed, hasher(key.grpc_status_code)); + hash_combine(seed, hasher(key.span_kind)); + hash_combine(seed, bool_hasher(key.synthetics)); + hash_combine(seed, uint_hasher(static_cast(key.is_trace_root))); + hash_combine(seed, hasher(key.peer_tags_hash)); + hash_combine(seed, hasher(key.http_method)); + hash_combine(seed, hasher(key.http_endpoint)); + return seed; +} + +// -- Free functions -- + +bool is_top_level(const SpanData& span) { + // A span is top-level if parent_id == 0, or if _dd.top_level == 1. + if (span.parent_id == 0) { + return true; + } + auto it = span.numeric_tags.find(std::string(tag_top_level)); + return it != span.numeric_tags.end() && it->second == 1.0; +} + +bool is_measured(const SpanData& span) { + auto it = span.numeric_tags.find(std::string(tag_measured)); + return it != span.numeric_tags.end() && it->second == 1.0; +} + +bool is_stats_eligible(const SpanData& span) { + if (is_top_level(span) || is_measured(span)) { + return true; + } + auto kind = lookup_tag(span.tags, tag_span_kind); + if (kind && is_stats_span_kind(*kind)) { + return true; + } + return false; +} + +std::string extract_grpc_status_code(const SpanData& span) { + for (const auto& tag_name : grpc_tag_candidates) { + // Check string tags first. + auto str_val = lookup_tag(span.tags, tag_name); + if (str_val) { + // Validate it parses as a number, then return the string form. + uint32_t code = parse_status_code(*str_val); + if (code > 0) return std::to_string(code); + } + // Check numeric tags. + auto num_val = lookup_numeric_tag(span.numeric_tags, tag_name); + if (num_val) { + return std::to_string(static_cast(*num_val)); + } + } + return ""; +} + +std::uint32_t extract_http_status_code(const SpanData& span) { + auto val = lookup_tag(span.tags, tag_http_status_code); + if (val) { + return parse_status_code(*val); + } + auto num_val = lookup_numeric_tag(span.numeric_tags, tag_http_status_code); + if (num_val) { + return static_cast(*num_val); + } + return 0; +} + +// -- StatsConcentrator -- + +StatsConcentrator::StatsConcentrator( + const std::shared_ptr& http_client, + const HTTPClient::URL& agent_url, const std::shared_ptr& logger, + std::string hostname, std::string env, std::string version, + std::string service, std::string lang, std::string tracer_version, + std::string runtime_id) + : http_client_(http_client), + stats_endpoint_(stats_endpoint(agent_url)), + logger_(logger), + hostname_(std::move(hostname)), + env_(std::move(env)), + version_(std::move(version)), + service_(std::move(service)), + lang_(std::move(lang)), + tracer_version_(std::move(tracer_version)), + runtime_id_(std::move(runtime_id)) {} + +void StatsConcentrator::add(const SpanData& span) { + if (!is_stats_eligible(span)) { + return; + } + + // Calculate span end time in nanoseconds. + auto start_ns = static_cast( + std::chrono::duration_cast( + span.start.wall.time_since_epoch()) + .count()); + auto dur_ns = static_cast( + std::chrono::duration_cast(span.duration) + .count()); + uint64_t end_ns = start_ns + dur_ns; + + uint64_t bucket_start = align_timestamp(end_ns); + + auto key = make_key(span); + std::vector peer_tags = extract_peer_tags(span); + + std::lock_guard lock(mutex_); + + auto& bucket = get_or_create_bucket(bucket_start); + auto& group = bucket.groups[key]; + + group.hits++; + group.duration += dur_ns; + + if (span.error) { + group.errors++; + group.error_sketch.add(static_cast(dur_ns)); + } else { + group.ok_sketch.add(static_cast(dur_ns)); + } + + if (group.peer_tags_serialized.empty() && !peer_tags.empty()) { + group.peer_tags_serialized = std::move(peer_tags); + } +} + +void StatsConcentrator::flush(TimePoint now) { + auto now_ns = static_cast( + std::chrono::duration_cast( + now.wall.time_since_epoch()) + .count()); + + std::vector to_flush; + + { + std::lock_guard lock(mutex_); + auto it = buckets_.begin(); + while (it != buckets_.end()) { + // Flush buckets whose end time is before now. + if (it->second.start_ns + it->second.duration_ns <= now_ns) { + to_flush.push_back(std::move(it->second)); + it = buckets_.erase(it); + } else { + ++it; + } + } + } + + if (to_flush.empty()) { + return; + } + + std::string body = encode_payload(to_flush); + + auto set_headers = [](DictWriter& writer) { + writer.set("Content-Type", "application/msgpack"); + writer.set("Datadog-Client-Computed-Stats", "yes"); + writer.set("Datadog-Client-Computed-Top-Level", "yes"); + }; + + auto on_response = [logger = logger_](int status, + const DictReader& /*headers*/, + std::string response_body) { + if (status < 200 || status >= 300) { + logger->log_error([&](auto& stream) { + stream << "Stats flush: unexpected response status " << status + << " with body: " << response_body; + }); + } + // Fire-and-forget: we don't process the response body. + }; + + auto on_error = [logger = logger_](Error error) { + logger->log_error( + error.with_prefix("Error during stats flush HTTP request: ")); + }; + + // Use the clock for deadline. 2-second timeout. + auto deadline = + std::chrono::steady_clock::now() + std::chrono::milliseconds(2000); + + auto result = http_client_->post(stats_endpoint_, std::move(set_headers), + std::move(body), std::move(on_response), + std::move(on_error), deadline); + if (auto* error = result.if_error()) { + logger_->log_error( + error->with_prefix("Unexpected error submitting stats: ")); + } +} + +void StatsConcentrator::flush_all() { + std::vector to_flush; + + { + std::lock_guard lock(mutex_); + to_flush.reserve(buckets_.size()); + for (auto& [ts, bucket] : buckets_) { + to_flush.push_back(std::move(bucket)); + } + buckets_.clear(); + } + + if (to_flush.empty()) { + return; + } + + std::string body = encode_payload(to_flush); + + auto set_headers = [](DictWriter& writer) { + writer.set("Content-Type", "application/msgpack"); + writer.set("Datadog-Client-Computed-Stats", "yes"); + writer.set("Datadog-Client-Computed-Top-Level", "yes"); + }; + + auto on_response = [logger = logger_](int status, + const DictReader& /*headers*/, + std::string response_body) { + if (status < 200 || status >= 300) { + logger->log_error([&](auto& stream) { + stream << "Stats flush_all: unexpected response status " << status + << " with body: " << response_body; + }); + } + }; + + auto on_error = [logger = logger_](Error error) { + logger->log_error( + error.with_prefix("Error during stats flush_all HTTP request: ")); + }; + + auto deadline = + std::chrono::steady_clock::now() + std::chrono::milliseconds(2000); + + auto result = http_client_->post(stats_endpoint_, std::move(set_headers), + std::move(body), std::move(on_response), + std::move(on_error), deadline); + if (auto* error = result.if_error()) { + logger_->log_error( + error->with_prefix("Unexpected error submitting final stats: ")); + } +} + +std::size_t StatsConcentrator::bucket_count() const { + std::lock_guard lock(mutex_); + return buckets_.size(); +} + +uint64_t StatsConcentrator::align_timestamp(uint64_t end_time_ns) { + // Align to 10-second boundary. + return (end_time_ns / bucket_duration_ns) * bucket_duration_ns; +} + +StatsBucket& StatsConcentrator::get_or_create_bucket(uint64_t bucket_start_ns) { + auto it = buckets_.find(bucket_start_ns); + if (it != buckets_.end()) { + return it->second; + } + auto& bucket = buckets_[bucket_start_ns]; + bucket.start_ns = bucket_start_ns; + bucket.duration_ns = bucket_duration_ns; + return bucket; +} + +StatsAggregationKey StatsConcentrator::make_key(const SpanData& span) { + StatsAggregationKey key; + + key.service = span.service; + key.name = span.name; + key.resource = span.resource; + key.type = span.service_type; + + key.http_status_code = extract_http_status_code(span); + key.grpc_status_code = extract_grpc_status_code(span); + + auto kind = lookup_tag(span.tags, tag_span_kind); + if (kind) { + key.span_kind = std::string(*kind); + } + + // Synthetics detection: _dd.origin == "synthetics" or + // _dd.origin starts with "synthetics-" + auto origin = lookup_tag(span.tags, tag_origin); + if (origin) { + key.synthetics = + (*origin == "synthetics" || starts_with(*origin, "synthetics-")); + } + + // is_trace_root: TRUE if parent_id == 0, FALSE otherwise. + key.is_trace_root = (span.parent_id == 0) ? Trilean::TRUE : Trilean::FALSE; + + // Build a hash string from peer tags for use as an aggregation key dimension. + { + auto ptags = extract_peer_tags(span); + std::string hash_str; + for (std::size_t i = 0; i < ptags.size(); ++i) { + if (i > 0) hash_str += ','; + hash_str += ptags[i]; + } + key.peer_tags_hash = std::move(hash_str); + } + + auto http_method = lookup_tag(span.tags, tag_http_method); + if (http_method) { + key.http_method = std::string(*http_method); + } + + auto http_endpoint = lookup_tag(span.tags, tag_http_endpoint); + if (http_endpoint) { + key.http_endpoint = std::string(*http_endpoint); + } + + return key; +} + +std::vector StatsConcentrator::extract_peer_tags( + const SpanData& span) { + auto kind = lookup_tag(span.tags, tag_span_kind); + bool should_extract = false; + + if (kind && is_peer_tag_span_kind(*kind)) { + should_extract = true; + } + + // Internal span with _dd.base_service override. + if (!should_extract) { + auto base_service = lookup_tag(span.tags, tag_base_service); + if (base_service && *base_service != span.service) { + should_extract = true; + } + } + + if (!should_extract) { + return {}; + } + + // Build a sorted array of "key:value" peer tag strings. + std::vector tags; + for (const auto& tag_key : peer_tag_keys) { + auto val = lookup_tag(span.tags, tag_key); + if (val) { + std::string entry; + entry += std::string(tag_key); + entry += ':'; + entry += std::string(*val); + tags.push_back(std::move(entry)); + } + } + + std::sort(tags.begin(), tags.end()); + return tags; +} + +std::string StatsConcentrator::encode_payload( + const std::vector& buckets) { + std::string payload; + + // The /v0.6/stats payload is a msgpack map: + // { + // "Hostname": string, + // "Env": string, + // "Version": string, + // "Lang": string, + // "TracerVersion": string, + // "RuntimeID": string, + // "Sequence": uint64, + // "Service": string, + // "Stats": [ + // { + // "Start": uint64 (bucket start in ns), + // "Duration": uint64 (bucket duration in ns), + // "Stats": [ + // { + // "Service": string, + // "Name": string, + // "Resource": string, + // "Type": string, + // "HTTPStatusCode": uint32, + // "Hits": uint64, + // "Errors": uint64, + // "Duration": uint64, + // "OkSummary": DDSketch, + // "ErrorSummary": DDSketch, + // "SpanKind": string, + // "Synthetics": bool, + // "IsTraceRoot": Trilean, + // "PeerTags": [string, ...] (array of "key:value"), + // "GRPCStatusCode": string, + // "HTTPMethod": string, + // "HTTPEndpoint": string, + // }, ... + // ] + // }, ... + // ] + // } + + // clang-format off + // Top-level map with 9 keys. + msgpack::pack_map(payload, 9); + + msgpack::pack_string(payload, "Hostname"); + msgpack::pack_string(payload, hostname_); + + msgpack::pack_string(payload, "Env"); + msgpack::pack_string(payload, env_); + + msgpack::pack_string(payload, "Version"); + msgpack::pack_string(payload, version_); + + msgpack::pack_string(payload, "Lang"); + msgpack::pack_string(payload, lang_); + + msgpack::pack_string(payload, "TracerVersion"); + msgpack::pack_string(payload, tracer_version_); + + msgpack::pack_string(payload, "RuntimeID"); + msgpack::pack_string(payload, runtime_id_); + + msgpack::pack_string(payload, "Sequence"); + msgpack::pack_integer(payload, sequence_++); + + msgpack::pack_string(payload, "Service"); + msgpack::pack_string(payload, service_); + + msgpack::pack_string(payload, "Stats"); + msgpack::pack_array(payload, buckets.size()); + + for (const auto& bucket : buckets) { + // Each bucket has 3 keys: Start, Duration, Stats. + msgpack::pack_map(payload, 3); + + msgpack::pack_string(payload, "Start"); + msgpack::pack_integer(payload, static_cast(bucket.start_ns)); + + msgpack::pack_string(payload, "Duration"); + msgpack::pack_integer(payload, static_cast(bucket.duration_ns)); + + msgpack::pack_string(payload, "Stats"); + msgpack::pack_array(payload, bucket.groups.size()); + + for (const auto& [agg_key, group] : bucket.groups) { + // Each stats group has up to 17 keys. + msgpack::pack_map(payload, 17); + + msgpack::pack_string(payload, "Service"); + msgpack::pack_string(payload, agg_key.service); + + msgpack::pack_string(payload, "Name"); + msgpack::pack_string(payload, agg_key.name); + + msgpack::pack_string(payload, "Resource"); + msgpack::pack_string(payload, agg_key.resource); + + msgpack::pack_string(payload, "Type"); + msgpack::pack_string(payload, agg_key.type); + + msgpack::pack_string(payload, "HTTPStatusCode"); + msgpack::pack_integer(payload, static_cast(agg_key.http_status_code)); + + msgpack::pack_string(payload, "Hits"); + msgpack::pack_integer(payload, group.hits); + + msgpack::pack_string(payload, "Errors"); + msgpack::pack_integer(payload, group.errors); + + msgpack::pack_string(payload, "Duration"); + msgpack::pack_integer(payload, group.duration); + + msgpack::pack_string(payload, "OkSummary"); + group.ok_sketch.msgpack_encode(payload); + + msgpack::pack_string(payload, "ErrorSummary"); + group.error_sketch.msgpack_encode(payload); + + msgpack::pack_string(payload, "SpanKind"); + msgpack::pack_string(payload, agg_key.span_kind); + + msgpack::pack_string(payload, "Synthetics"); + msgpack::pack_integer(payload, std::int64_t(agg_key.synthetics ? 1 : 0)); + + msgpack::pack_string(payload, "IsTraceRoot"); + msgpack::pack_integer(payload, std::int64_t(static_cast(agg_key.is_trace_root))); + + msgpack::pack_string(payload, "PeerTags"); + msgpack::pack_array(payload, group.peer_tags_serialized.size()); + for (const auto& ptag : group.peer_tags_serialized) { + msgpack::pack_string(payload, ptag); + } + + msgpack::pack_string(payload, "GRPCStatusCode"); + msgpack::pack_string(payload, agg_key.grpc_status_code); + + msgpack::pack_string(payload, "HTTPMethod"); + msgpack::pack_string(payload, agg_key.http_method); + + msgpack::pack_string(payload, "HTTPEndpoint"); + msgpack::pack_string(payload, agg_key.http_endpoint); + } + } + // clang-format on + + return payload; +} + +} // namespace tracing +} // namespace datadog diff --git a/src/datadog/stats_concentrator.h b/src/datadog/stats_concentrator.h new file mode 100644 index 00000000..2467a91b --- /dev/null +++ b/src/datadog/stats_concentrator.h @@ -0,0 +1,184 @@ +#pragma once + +// This component provides the `StatsConcentrator` class, which aggregates span +// metrics into 10-second time buckets for the Client-Side Stats Computation +// (CSS) feature. +// +// When CSS is enabled, the tracer computes trace metrics (hit counts, error +// counts, duration distributions) locally rather than relying on the Datadog +// Agent. This allows the Agent to drop priority-0 (unsampled) traces without +// losing visibility into service-level metrics. +// +// Each finished span that is "eligible" (top-level, measured, or has a +// span.kind of server/client/producer/consumer) is added to the concentrator. +// The concentrator groups spans by a 12-dimension aggregation key and +// accumulates counts and DDSketch distributions in 10-second time buckets. +// +// Periodically (at flush time), the concentrator serializes the accumulated +// stats and posts them to the Datadog Agent at POST /v0.6/stats. + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "ddsketch.h" + +namespace datadog { +namespace tracing { + +struct SpanData; +class Logger; + +// The time bucket duration: 10 seconds. +constexpr auto stats_bucket_duration = std::chrono::seconds(10); + +// Trilean: used for is_trace_root dimension. +enum class Trilean : std::uint32_t { + NOT_SET = 0, + TRUE = 1, + FALSE = 2, +}; + +// The 12-dimension aggregation key. +struct StatsAggregationKey { + std::string service; + std::string name; // operation name + std::string resource; + std::string type; + std::uint32_t http_status_code = 0; + std::string grpc_status_code; + std::string span_kind; + bool synthetics = false; + Trilean is_trace_root = Trilean::NOT_SET; + std::string peer_tags_hash; // serialized peer tags for hashing + std::string http_method; + std::string http_endpoint; + + bool operator==(const StatsAggregationKey& other) const; +}; + +// Hash function for StatsAggregationKey. +struct StatsAggregationKeyHash { + std::size_t operator()(const StatsAggregationKey& key) const; +}; + +// The stats for a single aggregation key within a single time bucket. +struct StatsGroupData { + std::uint64_t hits = 0; + std::uint64_t errors = 0; + std::uint64_t duration = 0; // total duration in nanoseconds + DDSketch ok_sketch{0.01, 2048}; + DDSketch error_sketch{0.01, 2048}; + std::vector + peer_tags_serialized; // array of "key:value" strings for payload +}; + +// A single 10-second time bucket. +struct StatsBucket { + // The start time of this bucket (as a Unix timestamp in nanoseconds, + // aligned to a 10-second boundary). + std::uint64_t start_ns = 0; + // Duration of the bucket in nanoseconds (always 10 seconds). + std::uint64_t duration_ns = 0; + + std::unordered_map + groups; +}; + +// Return whether the specified `span` is eligible for stats computation. +// Eligible means: top-level OR measured OR span_kind in +// {server,client,producer,consumer}. +bool is_stats_eligible(const SpanData& span); + +// Return whether the specified `span` is top-level (no parent, or +// _dd.top_level == 1). +bool is_top_level(const SpanData& span); + +// Return whether the specified `span` is measured (_dd.measured == 1). +bool is_measured(const SpanData& span); + +// Extract the gRPC status code from span tags, checking multiple tag names. +// Returns a string representation (e.g., "0", "2", "14") to match the +// protobuf definition (string GRPC_status_code = 18). +std::string extract_grpc_status_code(const SpanData& span); + +// Extract the HTTP status code from span tags. +std::uint32_t extract_http_status_code(const SpanData& span); + +class StatsConcentrator { + public: + // Construct a concentrator that will use the specified `http_client` to post + // stats to the agent at the specified `agent_url`, with the specified + // `logger` for diagnostics, and the specified `hostname`, `env`, and + // `version` for the stats payload metadata. + StatsConcentrator(const std::shared_ptr& http_client, + const HTTPClient::URL& agent_url, + const std::shared_ptr& logger, std::string hostname, + std::string env, std::string version, std::string service, + std::string lang = "cpp", std::string tracer_version = "", + std::string runtime_id = ""); + + // Add the specified `span` to the concentrator. The span must be eligible + // (the caller should check `is_stats_eligible` first, though this method + // will check again). + void add(const SpanData& span); + + // Flush all complete buckets (those whose end time is before `now`) and + // POST them to the agent. This is typically called from the periodic flush + // timer. + void flush(TimePoint now); + + // Flush all buckets (including the current one). Called at shutdown. + void flush_all(); + + // Return the number of currently accumulated buckets (for testing). + std::size_t bucket_count() const; + + // Encode the specified `buckets` into msgpack for POST /v0.6/stats. + // NOTE: not const because it increments the sequence counter. + std::string encode_payload(const std::vector& buckets); + + private: + // Return the bucket start time (aligned to 10-second boundary) for the + // specified span end time. + static std::uint64_t align_timestamp(std::uint64_t end_time_ns); + + // Get or create the bucket for the specified aligned start time. + StatsBucket& get_or_create_bucket(std::uint64_t bucket_start_ns); + + // Build the StatsAggregationKey from a span. + static StatsAggregationKey make_key(const SpanData& span); + + // Extract peer tags from a span (for client/producer/consumer kinds, or + // internal with _dd.base_service override). + static std::vector extract_peer_tags(const SpanData& span); + + mutable std::mutex mutex_; + std::shared_ptr http_client_; + HTTPClient::URL stats_endpoint_; + std::shared_ptr logger_; + + std::string hostname_; + std::string env_; + std::string version_; + std::string service_; + std::string lang_; + std::string tracer_version_; + std::string runtime_id_; + std::uint64_t sequence_ = 0; + + // Buckets keyed by their aligned start timestamp (nanoseconds). + std::unordered_map buckets_; +}; + +} // namespace tracing +} // namespace datadog diff --git a/supported-configurations.json b/supported-configurations.json index 4ee3558a..1ce047e6 100644 --- a/supported-configurations.json +++ b/supported-configurations.json @@ -266,6 +266,13 @@ "type": "BOOLEAN" } ], + "DD_TRACE_STATS_COMPUTATION_ENABLED": [ + { + "default": "0", + "implementation": "A", + "type": "BOOLEAN" + } + ], "DD_TRACE_TAGS_PROPAGATION_MAX_LENGTH": [ { "default": "512", diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7571aa8b..5c2a4ea6 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -40,6 +40,8 @@ add_executable(tests test_tracer.cpp test_trace_sampler.cpp test_endpoint_inferral.cpp + test_ddsketch.cpp + test_stats_concentrator.cpp remote_config/test_remote_config.cpp ) diff --git a/test/test_ddsketch.cpp b/test/test_ddsketch.cpp new file mode 100644 index 00000000..fa79c6f2 --- /dev/null +++ b/test/test_ddsketch.cpp @@ -0,0 +1,136 @@ +#include +#include + +#include "ddsketch.h" +#include "test.h" + +using namespace datadog::tracing; + +#define DDSKETCH_TEST(x) TEST_CASE(x, "[ddsketch]") + +DDSKETCH_TEST("empty sketch") { + DDSketch sketch; + + CHECK(sketch.empty()); + CHECK(sketch.count() == 0); + CHECK(sketch.sum() == 0.0); + CHECK(sketch.min() == 0.0); + CHECK(sketch.max() == 0.0); + CHECK(sketch.avg() == 0.0); +} + +DDSKETCH_TEST("single value") { + DDSketch sketch; + sketch.add(100.0); + + CHECK(!sketch.empty()); + CHECK(sketch.count() == 1); + CHECK(sketch.sum() == 100.0); + CHECK(sketch.min() == 100.0); + CHECK(sketch.max() == 100.0); + CHECK(sketch.avg() == 100.0); +} + +DDSKETCH_TEST("multiple values") { + DDSketch sketch; + sketch.add(10.0); + sketch.add(20.0); + sketch.add(30.0); + + CHECK(sketch.count() == 3); + CHECK(sketch.sum() == Approx(60.0)); + CHECK(sketch.min() == Approx(10.0)); + CHECK(sketch.max() == Approx(30.0)); + CHECK(sketch.avg() == Approx(20.0)); +} + +DDSKETCH_TEST("zero value goes to zero bucket") { + DDSketch sketch; + sketch.add(0.0); + + CHECK(sketch.count() == 1); + CHECK(sketch.sum() == 0.0); + CHECK(sketch.min() == 0.0); + CHECK(sketch.max() == 0.0); +} + +DDSKETCH_TEST("negative value treated as zero") { + DDSketch sketch; + sketch.add(-5.0); + + CHECK(sketch.count() == 1); + CHECK(sketch.sum() == 0.0); + CHECK(sketch.min() == 0.0); + CHECK(sketch.max() == 0.0); +} + +DDSKETCH_TEST("very small positive value goes to zero bucket") { + DDSketch sketch; + sketch.add(1e-12); // below the min_positive threshold + + CHECK(sketch.count() == 1); + CHECK(sketch.sum() == Approx(1e-12)); +} + +DDSKETCH_TEST("large number of values") { + DDSketch sketch; + double total = 0.0; + for (int i = 1; i <= 1000; ++i) { + double val = static_cast(i); + sketch.add(val); + total += val; + } + + CHECK(sketch.count() == 1000); + CHECK(sketch.sum() == Approx(total)); + CHECK(sketch.min() == Approx(1.0)); + CHECK(sketch.max() == Approx(1000.0)); + CHECK(sketch.avg() == Approx(500.5)); +} + +DDSKETCH_TEST("clear resets the sketch") { + DDSketch sketch; + sketch.add(100.0); + sketch.add(200.0); + + sketch.clear(); + + CHECK(sketch.empty()); + CHECK(sketch.count() == 0); + CHECK(sketch.sum() == 0.0); +} + +DDSKETCH_TEST("msgpack_encode produces non-empty output") { + DDSketch sketch; + sketch.add(1000000.0); // 1ms in nanoseconds + sketch.add(5000000.0); // 5ms + sketch.add(10000000.0); // 10ms + + std::string encoded; + sketch.msgpack_encode(encoded); + + // The output should be non-empty. + CHECK(!encoded.empty()); + // It should start with a msgpack map marker. + CHECK(static_cast(encoded[0]) == 0xDF); // MAP32 +} + +DDSKETCH_TEST("msgpack_encode of empty sketch") { + DDSketch sketch; + + std::string encoded; + sketch.msgpack_encode(encoded); + + // Even an empty sketch should produce a valid encoding. + CHECK(!encoded.empty()); +} + +DDSKETCH_TEST("custom relative accuracy") { + DDSketch sketch(0.05, 1024); // 5% accuracy, 1024 bins + + sketch.add(100.0); + sketch.add(200.0); + + CHECK(sketch.count() == 2); + CHECK(sketch.sum() == Approx(300.0)); +} diff --git a/test/test_stats_concentrator.cpp b/test/test_stats_concentrator.cpp new file mode 100644 index 00000000..f81e3874 --- /dev/null +++ b/test/test_stats_concentrator.cpp @@ -0,0 +1,440 @@ +#include +#include +#include +#include +#include + +#include "mocks/http_clients.h" +#include "mocks/loggers.h" +#include "span_data.h" +#include "stats_concentrator.h" +#include "test.h" + +using namespace datadog; +using namespace datadog::tracing; +using namespace std::chrono_literals; + +#define STATS_TEST(x) TEST_CASE(x, "[stats_concentrator]") + +namespace { + +// Create a SpanData with reasonable defaults for testing. +std::unique_ptr make_span( + const std::string& service, const std::string& name, + const std::string& resource, bool is_error = false, + std::uint64_t parent_id = 0, + std::chrono::nanoseconds duration = 1000000ns) { + auto span = std::make_unique(); + span->service = service; + span->name = name; + span->resource = resource; + span->parent_id = parent_id; + span->error = is_error; + + // Set start to a known time, e.g. 2024-01-01 00:00:00 UTC + auto wall = + std::chrono::system_clock::time_point(std::chrono::seconds(1704067200)); + auto tick = std::chrono::steady_clock::now(); + span->start = TimePoint{wall, tick}; + span->duration = duration; + + return span; +} + +} // namespace + +STATS_TEST("is_top_level: parent_id == 0") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 0; + + CHECK(is_top_level(*span)); +} + +STATS_TEST("is_top_level: parent_id != 0 without tag") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + + CHECK(!is_top_level(*span)); +} + +STATS_TEST("is_top_level: _dd.top_level numeric tag == 1") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + span->numeric_tags["_dd.top_level"] = 1.0; + + CHECK(is_top_level(*span)); +} + +STATS_TEST("is_measured: _dd.measured numeric tag == 1") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + span->numeric_tags["_dd.measured"] = 1.0; + + CHECK(is_measured(*span)); +} + +STATS_TEST("is_measured: no tag") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + + CHECK(!is_measured(*span)); +} + +STATS_TEST("is_stats_eligible: top-level span is eligible") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 0; + + CHECK(is_stats_eligible(*span)); +} + +STATS_TEST("is_stats_eligible: measured span is eligible") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + span->numeric_tags["_dd.measured"] = 1.0; + + CHECK(is_stats_eligible(*span)); +} + +STATS_TEST("is_stats_eligible: server span kind is eligible") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + span->tags["span.kind"] = "server"; + + CHECK(is_stats_eligible(*span)); +} + +STATS_TEST("is_stats_eligible: client span kind is eligible") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + span->tags["span.kind"] = "client"; + + CHECK(is_stats_eligible(*span)); +} + +STATS_TEST("is_stats_eligible: producer span kind is eligible") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + span->tags["span.kind"] = "producer"; + + CHECK(is_stats_eligible(*span)); +} + +STATS_TEST("is_stats_eligible: consumer span kind is eligible") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + span->tags["span.kind"] = "consumer"; + + CHECK(is_stats_eligible(*span)); +} + +STATS_TEST("is_stats_eligible: internal span without other tags is not") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + span->tags["span.kind"] = "internal"; + + CHECK(!is_stats_eligible(*span)); +} + +STATS_TEST("is_stats_eligible: span with no special tags and parent is not") { + auto span = make_span("svc", "op", "res"); + span->parent_id = 12345; + + CHECK(!is_stats_eligible(*span)); +} + +STATS_TEST("extract_http_status_code: from string tag") { + auto span = make_span("svc", "op", "res"); + span->tags["http.status_code"] = "200"; + + CHECK(extract_http_status_code(*span) == 200); +} + +STATS_TEST("extract_http_status_code: from numeric tag") { + auto span = make_span("svc", "op", "res"); + span->numeric_tags["http.status_code"] = 404.0; + + CHECK(extract_http_status_code(*span) == 404); +} + +STATS_TEST("extract_http_status_code: no tag") { + auto span = make_span("svc", "op", "res"); + + CHECK(extract_http_status_code(*span) == 0); +} + +STATS_TEST("extract_grpc_status_code: rpc.grpc.status_code tag") { + auto span = make_span("svc", "op", "res"); + span->tags["rpc.grpc.status_code"] = "2"; + + CHECK(extract_grpc_status_code(*span) == "2"); +} + +STATS_TEST("extract_grpc_status_code: grpc.code numeric tag") { + auto span = make_span("svc", "op", "res"); + span->numeric_tags["grpc.code"] = 14.0; + + CHECK(extract_grpc_status_code(*span) == "14"); +} + +STATS_TEST("extract_grpc_status_code: no tag") { + auto span = make_span("svc", "op", "res"); + + CHECK(extract_grpc_status_code(*span) == ""); +} + +STATS_TEST("concentrator: add eligible span creates bucket") { + auto http_client = std::make_shared(); + auto logger = std::make_shared(); + HTTPClient::URL agent_url; + agent_url.scheme = "http"; + agent_url.authority = "localhost:8126"; + agent_url.path = ""; + + StatsConcentrator concentrator(http_client, agent_url, logger, "host1", + "prod", "1.0", "svc", "cpp"); + + auto span = make_span("svc", "web.request", "/api/test"); + concentrator.add(*span); + + CHECK(concentrator.bucket_count() == 1); +} + +STATS_TEST("concentrator: add ineligible span does not create bucket") { + auto http_client = std::make_shared(); + auto logger = std::make_shared(); + HTTPClient::URL agent_url; + agent_url.scheme = "http"; + agent_url.authority = "localhost:8126"; + agent_url.path = ""; + + StatsConcentrator concentrator(http_client, agent_url, logger, "host1", + "prod", "1.0", "svc", "cpp"); + + auto span = make_span("svc", "internal.op", "/internal"); + span->parent_id = 12345; // not top-level + // No measured tag, no special span.kind + concentrator.add(*span); + + CHECK(concentrator.bucket_count() == 0); +} + +STATS_TEST("concentrator: multiple spans in same bucket") { + auto http_client = std::make_shared(); + auto logger = std::make_shared(); + HTTPClient::URL agent_url; + agent_url.scheme = "http"; + agent_url.authority = "localhost:8126"; + agent_url.path = ""; + + StatsConcentrator concentrator(http_client, agent_url, logger, "host1", + "prod", "1.0", "svc", "cpp"); + + // Create two spans with the same start time (same 10s bucket). + auto span1 = make_span("svc", "web.request", "/api/test", false, 0, 1ms); + auto span2 = make_span("svc", "web.request", "/api/test", true, 0, 2ms); + + concentrator.add(*span1); + concentrator.add(*span2); + + // Both should be in the same bucket. + CHECK(concentrator.bucket_count() == 1); +} + +STATS_TEST("concentrator: spans in different buckets") { + auto http_client = std::make_shared(); + auto logger = std::make_shared(); + HTTPClient::URL agent_url; + agent_url.scheme = "http"; + agent_url.authority = "localhost:8126"; + agent_url.path = ""; + + StatsConcentrator concentrator(http_client, agent_url, logger, "host1", + "prod", "1.0", "svc", "cpp"); + + auto span1 = make_span("svc", "web.request", "/api/test", false, 0, 1ms); + + // Create a span with a different start time (20 seconds later). + auto span2 = make_span("svc", "web.request", "/api/other", false, 0, 1ms); + span2->start.wall += std::chrono::seconds(20); + + concentrator.add(*span1); + concentrator.add(*span2); + + CHECK(concentrator.bucket_count() == 2); +} + +STATS_TEST("concentrator: encode_payload produces non-empty output") { + auto http_client = std::make_shared(); + auto logger = std::make_shared(); + HTTPClient::URL agent_url; + agent_url.scheme = "http"; + agent_url.authority = "localhost:8126"; + agent_url.path = ""; + + StatsConcentrator concentrator(http_client, agent_url, logger, "host1", + "prod", "1.0", "svc", "cpp"); + + StatsBucket bucket; + bucket.start_ns = 1704067200000000000ULL; + bucket.duration_ns = 10000000000ULL; + + StatsAggregationKey key; + key.service = "svc"; + key.name = "web.request"; + key.resource = "/api/test"; + key.type = "web"; + key.is_trace_root = Trilean::TRUE; + + StatsGroupData group; + group.hits = 5; + group.errors = 1; + group.duration = 50000000; // 50ms total + group.ok_sketch.add(10000000.0); // 10ms + group.error_sketch.add(5000000.0); // 5ms + + bucket.groups[key] = std::move(group); + + std::vector buckets = {std::move(bucket)}; + std::string payload = concentrator.encode_payload(buckets); + + CHECK(!payload.empty()); + // Should start with msgpack MAP32 marker. + CHECK(static_cast(payload[0]) == 0xDF); +} + +STATS_TEST("StatsAggregationKey equality") { + StatsAggregationKey a; + a.service = "svc"; + a.name = "op"; + a.resource = "res"; + a.type = "web"; + a.http_status_code = 200; + a.is_trace_root = Trilean::TRUE; + + StatsAggregationKey b = a; + CHECK(a == b); + + b.service = "other-svc"; + CHECK(!(a == b)); +} + +STATS_TEST("StatsAggregationKey hash: equal keys same hash") { + StatsAggregationKey a; + a.service = "svc"; + a.name = "op"; + a.resource = "res"; + a.type = "web"; + + StatsAggregationKey b = a; + + StatsAggregationKeyHash hasher; + CHECK(hasher(a) == hasher(b)); +} + +STATS_TEST("StatsAggregationKey hash: different keys different hash") { + StatsAggregationKey a; + a.service = "svc-a"; + a.name = "op"; + + StatsAggregationKey b; + b.service = "svc-b"; + b.name = "op"; + + StatsAggregationKeyHash hasher; + // This could theoretically collide, but with these inputs it should not. + CHECK(hasher(a) != hasher(b)); +} + +STATS_TEST( + "encode_payload includes TracerVersion, RuntimeID, Sequence, Service") { + auto http_client = std::make_shared(); + auto logger = std::make_shared(); + HTTPClient::URL agent_url; + agent_url.scheme = "http"; + agent_url.authority = "localhost:8126"; + agent_url.path = ""; + + StatsConcentrator concentrator(http_client, agent_url, logger, "host1", + "prod", "1.0", "my-service", "cpp", "v2.0.1", + "abc-123-runtime-id"); + + StatsBucket bucket; + bucket.start_ns = 1704067200000000000ULL; + bucket.duration_ns = 10000000000ULL; + + StatsAggregationKey key; + key.service = "svc"; + key.name = "web.request"; + key.resource = "/api/test"; + key.type = "web"; + key.is_trace_root = Trilean::TRUE; + + StatsGroupData group; + group.hits = 1; + group.duration = 1000000; + + bucket.groups[key] = std::move(group); + + std::vector buckets = {std::move(bucket)}; + std::string payload = concentrator.encode_payload(buckets); + + auto j = nlohmann::json::from_msgpack(payload); + + CHECK(j["Hostname"] == "host1"); + CHECK(j["Env"] == "prod"); + CHECK(j["Version"] == "1.0"); + CHECK(j["Lang"] == "cpp"); + CHECK(j["TracerVersion"] == "v2.0.1"); + CHECK(j["RuntimeID"] == "abc-123-runtime-id"); + CHECK(j["Sequence"] == 0); + CHECK(j["Service"] == "my-service"); + CHECK(j["Stats"].is_array()); + CHECK(j["Stats"].size() == 1); +} + +STATS_TEST("encode_payload Sequence increments on each call") { + auto http_client = std::make_shared(); + auto logger = std::make_shared(); + HTTPClient::URL agent_url; + agent_url.scheme = "http"; + agent_url.authority = "localhost:8126"; + agent_url.path = ""; + + StatsConcentrator concentrator(http_client, agent_url, logger, "host1", + "prod", "1.0", "my-service", "cpp", "v2.0.1", + "abc-123-runtime-id"); + + StatsBucket bucket; + bucket.start_ns = 1704067200000000000ULL; + bucket.duration_ns = 10000000000ULL; + + StatsAggregationKey key; + key.service = "svc"; + key.name = "web.request"; + key.resource = "/api/test"; + key.type = "web"; + key.is_trace_root = Trilean::TRUE; + + StatsGroupData group; + group.hits = 1; + group.duration = 1000000; + + bucket.groups[key] = std::move(group); + + std::vector buckets = {bucket}; + + // First call: Sequence should be 0. + std::string payload1 = concentrator.encode_payload(buckets); + auto j1 = nlohmann::json::from_msgpack(payload1); + CHECK(j1["Sequence"] == 0); + + // Second call: Sequence should be 1. + std::string payload2 = concentrator.encode_payload(buckets); + auto j2 = nlohmann::json::from_msgpack(payload2); + CHECK(j2["Sequence"] == 1); + + // Third call: Sequence should be 2. + std::string payload3 = concentrator.encode_payload(buckets); + auto j3 = nlohmann::json::from_msgpack(payload3); + CHECK(j3["Sequence"] == 2); +}