Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ cc_library(
"src/datadog/datadog_agent.cpp",
"src/datadog/datadog_agent.h",
"src/datadog/datadog_agent_config.cpp",
"src/datadog/ddsketch.cpp",
"src/datadog/ddsketch.h",
"src/datadog/default_http_client.h",
"src/datadog/default_http_client_null.cpp",
"src/datadog/endpoint_inferral.cpp",
Expand Down Expand Up @@ -52,6 +54,8 @@ cc_library(
"src/datadog/runtime_id.cpp",
"src/datadog/sampling_util.h",
"src/datadog/span.cpp",
"src/datadog/stats_concentrator.cpp",
"src/datadog/stats_concentrator.h",
"src/datadog/span_data.cpp",
"src/datadog/span_data.h",
"src/datadog/span_matcher.cpp",
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ target_sources(dd-trace-cpp-objects
src/datadog/collector_response.cpp
src/datadog/datadog_agent_config.cpp
src/datadog/datadog_agent.cpp
src/datadog/ddsketch.cpp
src/datadog/stats_concentrator.cpp
src/datadog/endpoint_inferral.cpp
src/datadog/environment.cpp
src/datadog/error.cpp
Expand Down
10 changes: 10 additions & 0 deletions include/datadog/datadog_agent_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ struct DatadogAgentConfig {
// How often, in seconds, to query the Datadog Agent for remote configuration
// updates.
Optional<double> remote_configuration_poll_interval_seconds;

// Whether the tracer should compute trace stats locally instead of relying
// on the Datadog Agent. When enabled, the tracer aggregates span metrics
// (hits, errors, duration distributions) and sends them to the Agent via
// POST /v0.6/stats. The Agent can then drop priority-0 traces without
// losing service-level metrics.
//
// Overridden by the DD_TRACE_STATS_COMPUTATION_ENABLED environment variable.
// Default: false.
Optional<bool> stats_computation_enabled;
};

class FinalizedDatadogAgentConfig {
Expand Down
3 changes: 2 additions & 1 deletion include/datadog/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ namespace environment {
MACRO(DD_APM_TRACING_ENABLED, BOOLEAN, true) \
MACRO(DD_TRACE_RESOURCE_RENAMING_ENABLED, BOOLEAN, false) \
MACRO(DD_TRACE_RESOURCE_RENAMING_ALWAYS_SIMPLIFIED_ENDPOINT, BOOLEAN, false) \
MACRO(DD_EXTERNAL_ENV, STRING, "")
MACRO(DD_EXTERNAL_ENV, STRING, "") \
MACRO(DD_TRACE_STATS_COMPUTATION_ENABLED, BOOLEAN, false)

#define ENV_DEFAULT_RESOLVED_IN_CODE(X) X
#define WITH_COMMA(ARG, TYPE, DEFAULT_VALUE) ARG,
Expand Down
30 changes: 30 additions & 0 deletions src/datadog/datadog_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "msgpack.h"
#include "platform_util.h"
#include "span_data.h"
#include "stats_concentrator.h"
#include "telemetry_metrics.h"
#include "trace_sampler.h"

Expand Down Expand Up @@ -169,6 +170,16 @@ DatadogAgent::DatadogAgent(
tracer_signature.library_version);
if (config.stats_computation_enabled) {
headers_.emplace("Datadog-Client-Computed-Stats", "yes");
headers_.emplace("Datadog-Client-Computed-Top-Level", "yes");

// Create the stats concentrator for Client-Side Stats Computation.
stats_concentrator_ = std::make_unique<StatsConcentrator>(
config.http_client, config.url, logger,
/*hostname=*/"", /*env=*/"", /*version=*/"",
/*service=*/std::string(tracer_signature.default_service),
/*lang=*/"cpp",
/*tracer_version=*/tracer_signature.library_version,
/*runtime_id=*/tracer_signature.runtime_id.string());
}

// Origin Detection headers are not necessary when Unix Domain Socket (UDS)
Expand Down Expand Up @@ -209,12 +220,26 @@ DatadogAgent::~DatadogAgent() {

flush();

// Flush any remaining stats at shutdown.
if (stats_concentrator_) {
stats_concentrator_->flush_all();
}

http_client_->drain(deadline);
}

Expected<void> DatadogAgent::send(
std::vector<std::unique_ptr<SpanData>>&& spans,
const std::shared_ptr<TraceSampler>& response_handler) {
// Feed eligible spans to the stats concentrator (if enabled).
if (stats_concentrator_) {
for (const auto& span_ptr : spans) {
if (span_ptr) {
stats_concentrator_->add(*span_ptr);
}
}
}

std::lock_guard<std::mutex> lock(mutex_);
trace_chunks_.push_back(TraceChunk{std::move(spans), response_handler});
return nullopt;
Expand Down Expand Up @@ -368,6 +393,11 @@ void DatadogAgent::flush() {
logger_->log_error(
error->with_prefix("Unexpected error submitting traces: "));
}

// Also flush stats if Client-Side Stats is enabled.
if (stats_concentrator_) {
stats_concentrator_->flush(clock_());
}
}

void DatadogAgent::get_and_apply_remote_configuration_updates() {
Expand Down
5 changes: 5 additions & 0 deletions src/datadog/datadog_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <vector>

#include "remote_config/remote_config.h"
#include "stats_concentrator.h"

namespace datadog {
namespace tracing {
Expand Down Expand Up @@ -52,6 +53,10 @@ class DatadogAgent : public Collector {

std::unordered_map<std::string, std::string> headers_;

// Client-Side Stats Computation: when enabled, this concentrator aggregates
// span metrics and flushes them to the Agent via POST /v0.6/stats.
std::unique_ptr<StatsConcentrator> stats_concentrator_;

void flush();

public:
Expand Down
11 changes: 9 additions & 2 deletions src/datadog/datadog_agent_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,15 @@ Expected<FinalizedDatadogAgentConfig> finalize_config(
result.admission_controller_uid = std::string(*external_env);
}

// Not supported yet but required for APM tracing disablement.
result.stats_computation_enabled = false;
// Client-Side Stats Computation. Default: false.
// Overridden by DD_TRACE_STATS_COMPUTATION_ENABLED env var.
if (auto stats_env =
lookup(environment::DD_TRACE_STATS_COMPUTATION_ENABLED)) {
result.stats_computation_enabled = !falsy(*stats_env);
} else {
result.stats_computation_enabled =
user_config.stats_computation_enabled.value_or(false);
}

return result;
}
Expand Down
170 changes: 170 additions & 0 deletions src/datadog/ddsketch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#include "ddsketch.h"

#include <algorithm>
#include <cassert>
#include <cmath>
#include <limits>

#include "msgpack.h"

namespace datadog {
namespace tracing {

DDSketch::DDSketch(double relative_accuracy, std::size_t max_num_bins)
: max_num_bins_(max_num_bins) {
assert(relative_accuracy > 0.0 && relative_accuracy < 1.0);
assert(max_num_bins > 0);

// gamma = (1 + alpha) / (1 - alpha)
gamma_ = (1.0 + relative_accuracy) / (1.0 - relative_accuracy);
log_gamma_ = std::log(gamma_);
}

int DDSketch::key(double value) const {
// Map value to a bin index using log(value) / log(gamma).
// The index is ceiling of log_gamma(value).
return static_cast<int>(std::ceil(std::log(value) / log_gamma_));
}

double DDSketch::lower_bound(int index) const {
return std::pow(gamma_, index - 1);
}

void DDSketch::add(double value) {
if (value < 0.0) {
value = 0.0;
}

count_++;
sum_ += value;

if (count_ == 1) {
min_ = value;
max_ = value;
} else {
if (value < min_) min_ = value;
if (value > max_) max_ = value;
}

// Values that are effectively zero go into the zero bucket.
// Using a threshold comparable to the smallest representable bin.
constexpr double min_positive = 1e-9; // 1 nanosecond
if (value <= min_positive) {
zero_count_++;
return;
}

int k = key(value);

if (bins_.empty()) {
min_key_ = k;
bins_.push_back(1);
return;
}

// Expand bins_ to cover the new key.
if (k < min_key_) {
// Prepend bins.
int prepend = min_key_ - k;
bins_.insert(bins_.begin(), static_cast<std::size_t>(prepend), 0);
min_key_ = k;
bins_[0] = 1;
} else if (k >= min_key_ + static_cast<int>(bins_.size())) {
// Append bins.
bins_.resize(static_cast<std::size_t>(k - min_key_ + 1), 0);
bins_[static_cast<std::size_t>(k - min_key_)] = 1;
} else {
bins_[static_cast<std::size_t>(k - min_key_)]++;
}

// If we have exceeded the max number of bins, collapse by merging
// adjacent bins (pairs from the left).
while (bins_.size() > max_num_bins_) {
std::vector<std::uint64_t> merged;
merged.reserve((bins_.size() + 1) / 2);
for (std::size_t i = 0; i < bins_.size(); i += 2) {
std::uint64_t c = bins_[i];
if (i + 1 < bins_.size()) {
c += bins_[i + 1];
}
merged.push_back(c);
}
// After merging pairs, the new min_key_ corresponds to
// the original min_key_ / 2 (floor division for the mapping).
// Actually, for log-based indexing, merging adjacent bins means
// each new bin covers twice the range, which is equivalent to
// halving the resolution. For simplicity, we just reassign.
min_key_ = min_key_ / 2;
bins_ = std::move(merged);
}
}

std::uint64_t DDSketch::count() const { return count_; }

double DDSketch::sum() const { return sum_; }

double DDSketch::min() const { return count_ > 0 ? min_ : 0.0; }

double DDSketch::max() const { return count_ > 0 ? max_ : 0.0; }

double DDSketch::avg() const {
return count_ > 0 ? sum_ / static_cast<double>(count_) : 0.0;
}

bool DDSketch::empty() const { return count_ == 0; }

void DDSketch::clear() {
bins_.clear();
min_key_ = 0;
count_ = 0;
zero_count_ = 0;
sum_ = 0.0;
min_ = 0.0;
max_ = 0.0;
}

void DDSketch::msgpack_encode(std::string& destination) const {
// The DDSketch proto message has 3 fields:
// mapping: {gamma, index_offset, interpolation}
// positive_values: {contiguous_bin_counts, contiguous_bin_index_offset}
// zero_count
//
// We encode this as a msgpack map.

// Mapping sub-map
// interpolation: 0 = NONE (logarithmic)
// index_offset: 0 (we use the raw index)

// clang-format off
msgpack::pack_map(destination, 3);

// 1) "mapping"
msgpack::pack_string(destination, "mapping");
msgpack::pack_map(destination, 3);
msgpack::pack_string(destination, "gamma");
msgpack::pack_double(destination, gamma_);
msgpack::pack_string(destination, "indexOffset");
msgpack::pack_double(destination, 0.0);
msgpack::pack_string(destination, "interpolation");
msgpack::pack_integer(destination, std::int64_t(0)); // NONE

// 2) "positiveValues"
msgpack::pack_string(destination, "positiveValues");
msgpack::pack_map(destination, 2);
msgpack::pack_string(destination, "contiguousBinCounts");
// Pack as array of doubles (matching the proto float64 repeated field).
msgpack::pack_array(destination, bins_.size());
for (auto c : bins_) {
msgpack::pack_double(destination, static_cast<double>(c));
}
msgpack::pack_string(destination, "contiguousBinIndexOffset");
msgpack::pack_integer(destination, std::int64_t(min_key_));

// 3) "zeroCount"
msgpack::pack_string(destination, "zeroCount");
msgpack::pack_double(destination, static_cast<double>(zero_count_));
// clang-format on
}

} // namespace tracing
} // namespace datadog
Loading
Loading