From 62d56beb9c7b0c51fc2d3f6314f2613815396bbb Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Wed, 18 Mar 2026 08:51:51 +0000 Subject: [PATCH 1/5] nh support for metrics aggregation Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 2 +- pkg/util/metrics_helper.go | 296 ++++++++++++++++++++++++++++++-- pkg/util/metrics_helper_test.go | 267 ++++++++++++++++++++++++++++ 3 files changed, 550 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fef5cebcf3..9a2dc315bb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## master / unreleased - +* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #6489 ## 1.21.0 in progress * [CHANGE] Ruler: Graduate Ruler API from experimental. #7312 diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go index 9aafae7afba..cc7794e88a1 100644 --- a/pkg/util/metrics_helper.go +++ b/pkg/util/metrics_helper.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "sort" "strings" "sync" @@ -467,10 +468,77 @@ func (s *SummaryData) Metric(desc *prometheus.Desc, labelValues ...string) prome } // HistogramData keeps data required to build histogram Metric. +// +// For native histograms, the Schema and ZeroThreshold are set from the first +// native histogram encountered. All aggregated histograms should use the same +// schema, as bucket indices have different meanings across different schemas. type HistogramData struct { sampleCount uint64 sampleSum float64 buckets map[float64]uint64 + + // Native histogram fields + nativeMode bool + Schema int32 + ZeroThreshold float64 + ZeroCount uint64 + PositiveBuckets map[int]int64 // bucket index -> count + NegativeBuckets map[int]int64 +} + +// isNative returns true if the dto.Histogram carries native histogram data. +// The authoritative signal is Schema being set (non-nil), which client_golang +// always populates for native/classic histograms regardless of observation count. +// Spans alone are insufficient because a zero-observation dual histogram has +// no spans but still has Schema set. +func isNative(histo *dto.Histogram) bool { + return histo.Schema != nil +} + +func (d *HistogramData) hasNative() bool { + return d.nativeMode +} + +// spansCountsToBucketMap converts native histogram spans and absolute bucket counts +// into a map of bucket index -> count for easier aggregation. +func spansCountsToBucketMap(spans []*dto.BucketSpan, counts []int64) map[int]int64 { + if len(spans) == 0 { + return nil + } + bucketMap := make(map[int]int64, len(counts)) + var idx int32 + bucketIdx := 0 + for _, sp := range spans { + idx += sp.GetOffset() + for j := 0; j < int(sp.GetLength()) && bucketIdx < len(counts); j++ { + bucketMap[int(idx)] += counts[bucketIdx] + idx++ + bucketIdx++ + } + } + return bucketMap +} + +// deltasToCountsInt converts delta-encoded bucket counts to absolute counts. +func deltasToCountsInt(deltas []int64) []int64 { + counts := make([]int64, len(deltas)) + var cur int64 + for i, d := range deltas { + cur += int64(d) + counts[i] = cur + } + return counts +} + +// mergeBucketMaps merges src bucket map into dst bucket map by summing counts for each bucket index. +func mergeBucketMaps(dst, src map[int]int64) map[int]int64 { + if dst == nil { + dst = make(map[int]int64) + } + for idx, count := range src { + dst[idx] += count + } + return dst } // AddHistogram adds histogram from gathered metrics to this histogram data. @@ -481,6 +549,26 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) { d.sampleCount += histo.GetSampleCount() d.sampleSum += histo.GetSampleSum() + if isNative(histo) { + // Initialize schema/threshold on first native histogram + if !d.hasNative() { + d.Schema = histo.GetSchema() + d.ZeroThreshold = histo.GetZeroThreshold() + d.nativeMode = true + } + d.ZeroCount += histo.GetZeroCount() + + posCounts := deltasToCountsInt(histo.GetPositiveDelta()) + negCounts := deltasToCountsInt(histo.GetNegativeDelta()) + + posMap := spansCountsToBucketMap(histo.GetPositiveSpan(), posCounts) + negMap := spansCountsToBucketMap(histo.GetNegativeSpan(), negCounts) + + d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, posMap) + d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, negMap) + } + + // Always collect classic buckets histoBuckets := histo.GetBucket() if len(histoBuckets) > 0 && d.buckets == nil { d.buckets = map[float64]uint64{} @@ -499,7 +587,18 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) { func (d *HistogramData) AddHistogramData(histo HistogramData) { d.sampleCount += histo.sampleCount d.sampleSum += histo.sampleSum + if histo.hasNative() { + if !d.hasNative() { + d.Schema = histo.Schema + d.ZeroThreshold = histo.ZeroThreshold + d.nativeMode = true + } + d.ZeroCount += histo.ZeroCount + d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, histo.PositiveBuckets) + d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, histo.NegativeBuckets) + } + // Always merge classic buckets. if len(histo.buckets) > 0 && d.buckets == nil { d.buckets = map[float64]uint64{} } @@ -510,11 +609,84 @@ func (d *HistogramData) AddHistogramData(histo HistogramData) { } } +// nativeHistogramMetric is basically the same as constNativeHistogram struct in prometheus histogram.go +// we need to create this new struct because the existing method NewConstNativeHistogram method in prometheus +// does not populate classic histogram fields. without this the NH compatible metrics are only exposed in NH format +// and classic histogram buckets are not exposed. +type nativeHistogramMetric struct { + desc *prometheus.Desc + dto.Histogram + labelPairs []*dto.LabelPair +} + +func (m *nativeHistogramMetric) Desc() *prometheus.Desc { return m.desc } +func (m *nativeHistogramMetric) Write(out *dto.Metric) error { + out.Histogram = &m.Histogram + out.Label = m.labelPairs + return nil +} + // Metric returns prometheus metric from this histogram data. // // Note that returned metric shares bucket with this HistogramData, so avoid // doing more modifications to this HistogramData after calling Metric. func (d *HistogramData) Metric(desc *prometheus.Desc, labelValues ...string) prometheus.Metric { + if d.hasNative() { + // Build native spans+deltas from bucket maps. + posSpans, posDeltas := makeBucketsFromMap(d.PositiveBuckets) + negSpans, negDeltas := makeBucketsFromMap(d.NegativeBuckets) + + schema := d.Schema + zt := d.ZeroThreshold + sc := d.sampleCount + ss := d.sampleSum + zc := d.ZeroCount + + // Build classic buckets if available + var buckets []*dto.Bucket + if len(d.buckets) > 0 { + buckets = make([]*dto.Bucket, 0, len(d.buckets)) + for ub, cc := range d.buckets { + upperBound := ub + cumCount := cc + buckets = append(buckets, &dto.Bucket{ + UpperBound: &upperBound, + CumulativeCount: &cumCount, + }) + } + sort.Slice(buckets, func(i, j int) bool { + return buckets[i].GetUpperBound() < buckets[j].GetUpperBound() + }) + } + + // Sentinel span for native histograms with no observations. + // This is required to distinguish an empty native histogram from a classic histogram. + // This matches the prometheus behavior (histogram.go:1958) + if zt == 0 && zc == 0 && len(posSpans) == 0 && len(negSpans) == 0 { + posSpans = []*dto.BucketSpan{{ + Offset: proto.Int32(0), + Length: proto.Uint32(0), + }} + } + + // Construct histogram in-place within the struct (no intermediate copy) + return &nativeHistogramMetric{ + desc: desc, + Histogram: dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: &sc, + SampleSum: &ss, + ZeroCount: &zc, + PositiveSpan: posSpans, + PositiveDelta: posDeltas, + NegativeSpan: negSpans, + NegativeDelta: negDeltas, + Bucket: buckets, + }, + labelPairs: prometheus.MakeLabelPairs(desc, labelValues), + } + } return prometheus.MustNewConstHistogram(desc, d.sampleCount, d.sampleSum, d.buckets, labelValues...) } @@ -897,24 +1069,120 @@ func mergeCounter(mf1, mf2 *dto.Metric) { } func mergeHistogram(mf1, mf2 *dto.Metric) { - bucketMap := map[float64]uint64{} - - for _, bucket := range append(mf1.Histogram.GetBucket(), mf2.Histogram.GetBucket()...) { - bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount() - } - - var newBucket []*dto.Bucket - for upperBound, cumulativeCount := range bucketMap { - ubValue := upperBound - ccValue := cumulativeCount - newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue}) - } - newSampleCount := *mf1.Histogram.SampleCount + *mf2.Histogram.SampleCount newSampleSum := *mf1.Histogram.SampleSum + *mf2.Histogram.SampleSum - mf1.Histogram.Bucket = newBucket mf1.Histogram.SampleCount = &newSampleCount mf1.Histogram.SampleSum = &newSampleSum + + h1 := mf1.Histogram + h2 := mf2.Histogram + + // Merge native histogram data if present. + // We'll process both native AND classic data below and expose in both formats + if isNative(h1) || isNative(h2) { + // Use schema/threshold from whichever side has native data (they should match). + if !isNative(h1) { + schema := h2.GetSchema() + h1.Schema = &schema + zt := h2.GetZeroThreshold() + h1.ZeroThreshold = &zt + } + + // Merge zero bucket counts + zc := h1.GetZeroCount() + h2.GetZeroCount() + h1.ZeroCount = &zc + + // Convert spans+deltas to bucket maps, merge them, then convert back + posCounts1 := deltasToCountsInt(h1.GetPositiveDelta()) + posCounts2 := deltasToCountsInt(h2.GetPositiveDelta()) + negCounts1 := deltasToCountsInt(h1.GetNegativeDelta()) + negCounts2 := deltasToCountsInt(h2.GetNegativeDelta()) + + posMap := mergeBucketMaps( + spansCountsToBucketMap(h1.GetPositiveSpan(), posCounts1), + spansCountsToBucketMap(h2.GetPositiveSpan(), posCounts2), + ) + negMap := mergeBucketMaps( + spansCountsToBucketMap(h1.GetNegativeSpan(), negCounts1), + spansCountsToBucketMap(h2.GetNegativeSpan(), negCounts2), + ) + + h1.PositiveSpan, h1.PositiveDelta = makeBucketsFromMap(posMap) + h1.NegativeSpan, h1.NegativeDelta = makeBucketsFromMap(negMap) + } + + // Merge classic histogram buckets if present. + if len(h1.GetBucket()) > 0 || len(h2.GetBucket()) > 0 { + bucketMap := map[float64]uint64{} + for _, bucket := range append(h1.GetBucket(), h2.GetBucket()...) { + bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount() + } + + var newBucket []*dto.Bucket + for upperBound, cumulativeCount := range bucketMap { + ubValue := upperBound + ccValue := cumulativeCount + newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue}) + } + h1.Bucket = newBucket + } +} + +// bucketMapToSpansDeltas converts a bucket index->count map back into the +// spans+deltas encoding used by the native histogram proto representation. +// +// This implementation is the same as makeBucketsFromMap from prometheus +// (histogram.go:2006) to include the gap-filling optimization +func makeBucketsFromMap(buckets map[int]int64) ([]*dto.BucketSpan, []int64) { + if len(buckets) == 0 { + return nil, nil + } + var ii []int + for k := range buckets { + ii = append(ii, k) + } + sort.Ints(ii) + + var ( + spans []*dto.BucketSpan + deltas []int64 + prevCount int64 + nextI int + ) + + appendDelta := func(count int64) { + *spans[len(spans)-1].Length++ + deltas = append(deltas, count-prevCount) + prevCount = count + } + + for n, i := range ii { + count := buckets[i] + // Multiple spans with only small gaps in between are probably + // encoded more efficiently as one larger span with a few empty + // buckets. Needs some research to find the sweet spot. For now, + // we assume that gaps of one or two buckets should not create + // a new span. + iDelta := int32(i - nextI) + if n == 0 || iDelta > 2 { + // We have to create a new span, either because we are + // at the very beginning, or because we have found a gap + // of more than two buckets. + spans = append(spans, &dto.BucketSpan{ + Offset: proto.Int32(iDelta), + Length: proto.Uint32(0), + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < iDelta; j++ { + appendDelta(0) + } + } + appendDelta(count) + nextI = i + 1 + } + return spans, deltas } func mergeSummary(mf1 *dto.Metric, mf2 *dto.Metric) { diff --git a/pkg/util/metrics_helper_test.go b/pkg/util/metrics_helper_test.go index 85d9895389b..0b3dbc52b62 100644 --- a/pkg/util/metrics_helper_test.go +++ b/pkg/util/metrics_helper_test.go @@ -1258,3 +1258,270 @@ func verifyLabels(t *testing.T, m prometheus.Collector, filter map[string]string require.Equal(t, expectedLabels, result) } + +// TestIsNative tests the native histogram detection function +func TestIsNative(t *testing.T) { + // Native histogram has Schema set + schema := int32(0) + nativeHisto := &dto.Histogram{ + Schema: &schema, + } + require.True(t, isNative(nativeHisto)) + + // Classic histogram has no Schema + classicHisto := &dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + }, + } + require.False(t, isNative(classicHisto)) + + // Empty histogram + require.False(t, isNative(&dto.Histogram{})) +} + +// TestHistogramData_AddHistogram_Native tests adding native histograms +func TestHistogramData_AddHistogram_Native(t *testing.T) { + hd := &HistogramData{} + + // Create a native histogram + schema := int32(0) + zt := 0.001 + histo := &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(2)}, + }, + PositiveDelta: []int64{5, 3}, // deltas: [5, 3] -> counts: [5, 8] + NegativeSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + NegativeDelta: []int64{2}, // deltas: [2] -> counts: [2] + } + + hd.AddHistogram(histo) + + require.True(t, hd.hasNative()) + require.Equal(t, int32(0), hd.Schema) + require.Equal(t, 0.001, hd.ZeroThreshold) + require.Equal(t, uint64(10), hd.sampleCount) + require.Equal(t, 100.0, hd.sampleSum) + require.Equal(t, uint64(2), hd.ZeroCount) + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, int64(8), hd.PositiveBuckets[1]) + require.Equal(t, int64(2), hd.NegativeBuckets[0]) +} + +// TestHistogramData_AddHistogram_Classic tests adding classic histograms +func TestHistogramData_AddHistogram_Classic(t *testing.T) { + hd := &HistogramData{} + + histo := &dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, + {UpperBound: proto.Float64(10.0), CumulativeCount: proto.Uint64(10)}, + }, + } + + hd.AddHistogram(histo) + + require.False(t, hd.hasNative()) + require.Equal(t, uint64(10), hd.sampleCount) + require.Equal(t, 100.0, hd.sampleSum) + require.Equal(t, uint64(5), hd.buckets[1.0]) + require.Equal(t, uint64(8), hd.buckets[5.0]) + require.Equal(t, uint64(10), hd.buckets[10.0]) +} + +// TestHistogramData_AddHistogram_DualFormat tests adding dual-format histograms (both native and classic) +func TestHistogramData_AddHistogram_DualFormat(t *testing.T) { + hd := &HistogramData{} + + schema := int32(0) + zt := 0.001 + histo := &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{5}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, + }, + } + + hd.AddHistogram(histo) + + // Both native and classic should be populated + require.True(t, hd.hasNative()) + require.Equal(t, uint64(2), hd.ZeroCount) + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, uint64(5), hd.buckets[1.0]) + require.Equal(t, uint64(8), hd.buckets[5.0]) +} + +// TestHistogramData_AddHistogram_Multiple tests merging multiple histograms +func TestHistogramData_AddHistogram_Multiple(t *testing.T) { + hd := &HistogramData{} + + schema := int32(0) + zt := 0.001 + + // First histogram + histo1 := &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(2)}, + }, + PositiveDelta: []int64{5, 3}, // counts: [5, 8] + } + + // Second histogram + histo2 := &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(5), + SampleSum: proto.Float64(50.0), + ZeroCount: proto.Uint64(1), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(1), Length: proto.Uint32(1)}, // bucket 1 + }, + PositiveDelta: []int64{2}, // counts: [2] at bucket 1 + } + + hd.AddHistogram(histo1) + hd.AddHistogram(histo2) + + require.Equal(t, uint64(15), hd.sampleCount) // 10 + 5 + require.Equal(t, 150.0, hd.sampleSum) // 100 + 50 + require.Equal(t, uint64(3), hd.ZeroCount) // 2 + 1 + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, int64(10), hd.PositiveBuckets[1]) // 8 + 2 +} + +// TestMakeBucketsFromMap tests the conversion from bucket map to spans and deltas +func TestMakeBucketsFromMap(t *testing.T) { + tests := []struct { + name string + buckets map[int]int64 + expectedSpans int + expectedDeltas []int64 + }{ + { + name: "empty bucket map", + buckets: map[int]int64{}, + expectedSpans: 0, + expectedDeltas: nil, + }, + { + name: "single bucket", + buckets: map[int]int64{0: 5}, + expectedSpans: 1, + expectedDeltas: []int64{5}, + }, + { + name: "contiguous buckets", + buckets: map[int]int64{0: 5, 1: 8, 2: 3}, + expectedSpans: 1, + expectedDeltas: []int64{5, 3, -5}, // deltas from previous count + }, + { + name: "buckets with small gap (filled)", + buckets: map[int]int64{0: 5, 2: 3}, // gap of 1 + expectedSpans: 1, + expectedDeltas: []int64{5, -5, 3}, // includes zero for gap + }, + { + name: "buckets with large gap (new span)", + buckets: map[int]int64{0: 5, 5: 3}, // gap of 4 + expectedSpans: 2, + expectedDeltas: []int64{5, -2}, // deltas carry across spans: bucket0=5, bucket5=3-5=-2 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + spans, deltas := makeBucketsFromMap(tt.buckets) + require.Equal(t, tt.expectedSpans, len(spans)) + if tt.expectedDeltas != nil { + require.Equal(t, tt.expectedDeltas, deltas) + } + }) + } +} + +// TestMergeHistogram tests the mergeHistogram function +func TestMergeHistogram(t *testing.T) { + schema := int32(0) + zt := 0.001 + + // First metric with native histogram + m1 := &dto.Metric{ + Histogram: &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{5}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + }, + }, + } + + // Second metric with native histogram + m2 := &dto.Metric{ + Histogram: &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(5), + SampleSum: proto.Float64(50.0), + ZeroCount: proto.Uint64(1), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{3}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(3)}, + }, + }, + } + + mergeHistogram(m1, m2) + + h := m1.Histogram + require.Equal(t, uint64(15), h.GetSampleCount()) // 10 + 5 + require.Equal(t, 150.0, h.GetSampleSum()) // 100 + 50 + require.Equal(t, uint64(3), h.GetZeroCount()) // 2 + 1 + + // Check classic buckets merged + require.Equal(t, 1, len(h.Bucket)) + require.Equal(t, uint64(8), h.Bucket[0].GetCumulativeCount()) // 5 + 3 + + // Check native buckets merged (should be 5 + 3 = 8) + posCounts := deltasToCountsInt(h.GetPositiveDelta()) + require.Equal(t, 1, len(posCounts)) + require.Equal(t, int64(8), posCounts[0]) +} From 4740331120378f2df8b564194cb19bc57ab9f759 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Thu, 19 Mar 2026 19:38:55 +0000 Subject: [PATCH 2/5] small fix: make modernize Signed-off-by: Shvejan Mutheboyina --- pkg/util/metrics_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go index cc7794e88a1..168034b772d 100644 --- a/pkg/util/metrics_helper.go +++ b/pkg/util/metrics_helper.go @@ -1175,7 +1175,7 @@ func makeBucketsFromMap(buckets map[int]int64) ([]*dto.BucketSpan, []int64) { } else { // We have found a small gap (or no gap at all). // Insert empty buckets as needed. - for j := int32(0); j < iDelta; j++ { + for range iDelta { appendDelta(0) } } From ba81824f6fb3e85eb19602e7cc80e1ce7d39bf05 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> Date: Fri, 20 Mar 2026 13:39:47 -0700 Subject: [PATCH 3/5] Fix PR number in changelog Co-authored-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81fbd31fb48..26810070274 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## master / unreleased -* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #6489 +* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 ## 1.21.0 in progress From 568fc2d45e313564153d4674268960259dd22b10 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Wed, 25 Mar 2026 19:33:33 +0000 Subject: [PATCH 4/5] fixup! nh support for metrics aggregation Signed-off-by: Shvejan Mutheboyina --- integration/dual_mode_metrics_test.go | 147 +++++++++ pkg/util/metrics_helper.go | 25 +- pkg/util/metrics_helper_test.go | 417 ++++++++++++++++---------- 3 files changed, 414 insertions(+), 175 deletions(-) create mode 100644 integration/dual_mode_metrics_test.go diff --git a/integration/dual_mode_metrics_test.go b/integration/dual_mode_metrics_test.go new file mode 100644 index 00000000000..b05a47cb738 --- /dev/null +++ b/integration/dual_mode_metrics_test.go @@ -0,0 +1,147 @@ +//go:build requires_docker + +package integration + +import ( + "fmt" + "io" + "math" + "net/http" + "testing" + "time" + + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func scrapeMetricsProtobuf(endpoint string) (map[string]*io_prometheus_client.MetricFamily, error) { + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Accept", "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited") + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d", resp.StatusCode) + } + + families := make(map[string]*io_prometheus_client.MetricFamily) + decoder := expfmt.NewDecoder(resp.Body, expfmt.FmtProtoDelim) + + for { + mf := &io_prometheus_client.MetricFamily{} + err := decoder.Decode(mf) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + families[mf.GetName()] = mf + } + + return families, nil +} + +// TestDualModeHistogramExposition validates cortex_ingester_tsdb_compaction_duration_seconds +// is exposed in dual mode with both classic buckets and native histogram fields. +func TestDualModeHistogramExposition(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.enable-native-histograms": "true", + "-blocks-storage.tsdb.head-compaction-interval": "1s", + "-blocks-storage.tsdb.head-compaction-idle-timeout": "1s", + "-alertmanager.web.external-url": "http://localhost/alertmanager", + "-alertmanager.cluster.listen-address": "127.0.0.1:9094", + "-alertmanager.cluster.advertise-address": "127.0.0.1:9094", + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + }) + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + baseTime := time.Now() + for i := 0; i < 100; i++ { + series := []prompb.TimeSeries{{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric"}, + {Name: "job", Value: "test"}, + {Name: "instance", Value: fmt.Sprintf("instance-%d", i%10)}, + }, + Samples: []prompb.Sample{ + {Value: float64(i), Timestamp: e2e.TimeToMilliseconds(baseTime.Add(time.Duration(i) * time.Second))}, + }, + }} + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(100), "cortex_ingester_ingested_samples_total")) + time.Sleep(5 * time.Second) + + families, err := scrapeMetricsProtobuf(fmt.Sprintf("http://%s/metrics", cortex.HTTPEndpoint())) + require.NoError(t, err) + + histFamily, ok := families["cortex_ingester_tsdb_compaction_duration_seconds"] + require.True(t, ok) + require.Equal(t, io_prometheus_client.MetricType_HISTOGRAM, histFamily.GetType()) + + metrics := histFamily.GetMetric() + require.NotEmpty(t, metrics) + + expectedBuckets := []float64{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, math.Inf(1)} + + for _, metric := range metrics { + h := metric.GetHistogram() + require.NotNil(t, h) + + buckets := h.GetBucket() + require.Equal(t, 14, len(buckets)) // testing classic histogram custom buckets + + for i, bucket := range buckets { + require.Equal(t, expectedBuckets[i], bucket.GetUpperBound()) // testing classic histogram custom bucket boundaries + } + + // Testing Native histogram fields + require.Equal(t, int32(3), h.GetSchema()) + require.NotNil(t, h.GetZeroThreshold()) + require.Equal(t, uint64(0), h.GetZeroCount()) + + sampleCount := h.GetSampleCount() + sampleSum := h.GetSampleSum() + require.Greater(t, sampleSum, 0.0) + require.Equal(t, sampleCount, buckets[len(buckets)-1].GetCumulativeCount()) + } +} diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go index 168034b772d..6dbe6348dba 100644 --- a/pkg/util/metrics_helper.go +++ b/pkg/util/metrics_helper.go @@ -609,10 +609,11 @@ func (d *HistogramData) AddHistogramData(histo HistogramData) { } } -// nativeHistogramMetric is basically the same as constNativeHistogram struct in prometheus histogram.go -// we need to create this new struct because the existing method NewConstNativeHistogram method in prometheus -// does not populate classic histogram fields. without this the NH compatible metrics are only exposed in NH format -// and classic histogram buckets are not exposed. +// nativeHistogramMetric enables dual-format histogram exposition (both native and classic simultaneously). +// Similar to prometheus/client_golang's constNativeHistogram (see +// https://github.com/prometheus/client_golang/blob/v1.23.2/prometheus/histogram.go#L1865-L1869) +// but populates both native histogram fields (spans/deltas) and classic bucket fields, +// allowing scrapers to choose which format to use via content negotiation. type nativeHistogramMetric struct { desc *prometheus.Desc dto.Histogram @@ -661,7 +662,8 @@ func (d *HistogramData) Metric(desc *prometheus.Desc, labelValues ...string) pro // Sentinel span for native histograms with no observations. // This is required to distinguish an empty native histogram from a classic histogram. - // This matches the prometheus behavior (histogram.go:1958) + // Matches prometheus/client_golang behavior: + // https://github.com/prometheus/client_golang/blob/v1.23.2/prometheus/histogram.go#L1958-L1962 if zt == 0 && zc == 0 && len(posSpans) == 0 && len(negSpans) == 0 { posSpans = []*dto.BucketSpan{{ Offset: proto.Int32(0), @@ -1080,7 +1082,10 @@ func mergeHistogram(mf1, mf2 *dto.Metric) { // Merge native histogram data if present. // We'll process both native AND classic data below and expose in both formats if isNative(h1) || isNative(h2) { - // Use schema/threshold from whichever side has native data (they should match). + // Use schema/threshold from whichever side has native data. + // Assumption: when both have native data, they share the same schema/zero_threshold. + // This holds for SendSumOfHistograms use case which aggregates metrics from the same + // prometheus client library histogram across multiple instances. if !isNative(h1) { schema := h2.GetSchema() h1.Schema = &schema @@ -1128,11 +1133,13 @@ func mergeHistogram(mf1, mf2 *dto.Metric) { } } -// bucketMapToSpansDeltas converts a bucket index->count map back into the +// makeBucketsFromMap converts a bucket index->count map back into the // spans+deltas encoding used by the native histogram proto representation. // -// This implementation is the same as makeBucketsFromMap from prometheus -// (histogram.go:2006) to include the gap-filling optimization +// Implementation adapted from prometheus/client_golang's makeBucketsFromMap +// (https://github.com/prometheus/client_golang/blob/v1.23.2/prometheus/histogram.go#L2006-L2056) +// with the gap-filling optimization: gaps of 1-2 empty buckets are filled with zeros +// rather than creating new spans, resulting in more efficient encoding. func makeBucketsFromMap(buckets map[int]int64) ([]*dto.BucketSpan, []int64) { if len(buckets) == 0 { return nil, nil diff --git a/pkg/util/metrics_helper_test.go b/pkg/util/metrics_helper_test.go index 0b3dbc52b62..7ee324e3dda 100644 --- a/pkg/util/metrics_helper_test.go +++ b/pkg/util/metrics_helper_test.go @@ -1283,138 +1283,140 @@ func TestIsNative(t *testing.T) { } // TestHistogramData_AddHistogram_Native tests adding native histograms -func TestHistogramData_AddHistogram_Native(t *testing.T) { - hd := &HistogramData{} - - // Create a native histogram +func TestHistogramData_AddHistogram(t *testing.T) { schema := int32(0) zt := 0.001 - histo := &dto.Histogram{ - Schema: &schema, - ZeroThreshold: &zt, - SampleCount: proto.Uint64(10), - SampleSum: proto.Float64(100.0), - ZeroCount: proto.Uint64(2), - PositiveSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(0), Length: proto.Uint32(2)}, - }, - PositiveDelta: []int64{5, 3}, // deltas: [5, 3] -> counts: [5, 8] - NegativeSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(0), Length: proto.Uint32(1)}, - }, - NegativeDelta: []int64{2}, // deltas: [2] -> counts: [2] - } - - hd.AddHistogram(histo) - - require.True(t, hd.hasNative()) - require.Equal(t, int32(0), hd.Schema) - require.Equal(t, 0.001, hd.ZeroThreshold) - require.Equal(t, uint64(10), hd.sampleCount) - require.Equal(t, 100.0, hd.sampleSum) - require.Equal(t, uint64(2), hd.ZeroCount) - require.Equal(t, int64(5), hd.PositiveBuckets[0]) - require.Equal(t, int64(8), hd.PositiveBuckets[1]) - require.Equal(t, int64(2), hd.NegativeBuckets[0]) -} - -// TestHistogramData_AddHistogram_Classic tests adding classic histograms -func TestHistogramData_AddHistogram_Classic(t *testing.T) { - hd := &HistogramData{} - histo := &dto.Histogram{ - SampleCount: proto.Uint64(10), - SampleSum: proto.Float64(100.0), - Bucket: []*dto.Bucket{ - {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, - {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, - {UpperBound: proto.Float64(10.0), CumulativeCount: proto.Uint64(10)}, + tests := []struct { + name string + histograms []*dto.Histogram + validate func(t *testing.T, hd *HistogramData) + }{ + { + name: "native histogram", + histograms: []*dto.Histogram{ + { + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(2)}, + }, + PositiveDelta: []int64{5, 3}, + NegativeSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + NegativeDelta: []int64{2}, + }, + }, + validate: func(t *testing.T, hd *HistogramData) { + require.True(t, hd.hasNative()) + require.Equal(t, int32(0), hd.Schema) + require.Equal(t, 0.001, hd.ZeroThreshold) + require.Equal(t, uint64(10), hd.sampleCount) + require.Equal(t, 100.0, hd.sampleSum) + require.Equal(t, uint64(2), hd.ZeroCount) + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, int64(8), hd.PositiveBuckets[1]) + require.Equal(t, int64(2), hd.NegativeBuckets[0]) + }, }, - } - - hd.AddHistogram(histo) - - require.False(t, hd.hasNative()) - require.Equal(t, uint64(10), hd.sampleCount) - require.Equal(t, 100.0, hd.sampleSum) - require.Equal(t, uint64(5), hd.buckets[1.0]) - require.Equal(t, uint64(8), hd.buckets[5.0]) - require.Equal(t, uint64(10), hd.buckets[10.0]) -} - -// TestHistogramData_AddHistogram_DualFormat tests adding dual-format histograms (both native and classic) -func TestHistogramData_AddHistogram_DualFormat(t *testing.T) { - hd := &HistogramData{} - - schema := int32(0) - zt := 0.001 - histo := &dto.Histogram{ - Schema: &schema, - ZeroThreshold: &zt, - SampleCount: proto.Uint64(10), - SampleSum: proto.Float64(100.0), - ZeroCount: proto.Uint64(2), - PositiveSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + { + name: "classic histogram", + histograms: []*dto.Histogram{ + { + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, + {UpperBound: proto.Float64(10.0), CumulativeCount: proto.Uint64(10)}, + }, + }, + }, + validate: func(t *testing.T, hd *HistogramData) { + require.False(t, hd.hasNative()) + require.Equal(t, uint64(10), hd.sampleCount) + require.Equal(t, 100.0, hd.sampleSum) + require.Equal(t, uint64(5), hd.buckets[1.0]) + require.Equal(t, uint64(8), hd.buckets[5.0]) + require.Equal(t, uint64(10), hd.buckets[10.0]) + }, }, - PositiveDelta: []int64{5}, - Bucket: []*dto.Bucket{ - {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, - {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, + { + name: "dual format histogram", + histograms: []*dto.Histogram{ + { + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{5}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, + }, + }, + }, + validate: func(t *testing.T, hd *HistogramData) { + require.True(t, hd.hasNative()) + require.Equal(t, uint64(2), hd.ZeroCount) + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, uint64(5), hd.buckets[1.0]) + require.Equal(t, uint64(8), hd.buckets[5.0]) + }, }, - } - - hd.AddHistogram(histo) - - // Both native and classic should be populated - require.True(t, hd.hasNative()) - require.Equal(t, uint64(2), hd.ZeroCount) - require.Equal(t, int64(5), hd.PositiveBuckets[0]) - require.Equal(t, uint64(5), hd.buckets[1.0]) - require.Equal(t, uint64(8), hd.buckets[5.0]) -} - -// TestHistogramData_AddHistogram_Multiple tests merging multiple histograms -func TestHistogramData_AddHistogram_Multiple(t *testing.T) { - hd := &HistogramData{} - - schema := int32(0) - zt := 0.001 - - // First histogram - histo1 := &dto.Histogram{ - Schema: &schema, - ZeroThreshold: &zt, - SampleCount: proto.Uint64(10), - SampleSum: proto.Float64(100.0), - ZeroCount: proto.Uint64(2), - PositiveSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(0), Length: proto.Uint32(2)}, + { + name: "multiple histograms", + histograms: []*dto.Histogram{ + { + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(2)}, + }, + PositiveDelta: []int64{5, 3}, + }, + { + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(5), + SampleSum: proto.Float64(50.0), + ZeroCount: proto.Uint64(1), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(1), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{2}, + }, + }, + validate: func(t *testing.T, hd *HistogramData) { + require.Equal(t, uint64(15), hd.sampleCount) + require.Equal(t, 150.0, hd.sampleSum) + require.Equal(t, uint64(3), hd.ZeroCount) + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, int64(10), hd.PositiveBuckets[1]) + }, }, - PositiveDelta: []int64{5, 3}, // counts: [5, 8] } - // Second histogram - histo2 := &dto.Histogram{ - Schema: &schema, - ZeroThreshold: &zt, - SampleCount: proto.Uint64(5), - SampleSum: proto.Float64(50.0), - ZeroCount: proto.Uint64(1), - PositiveSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(1), Length: proto.Uint32(1)}, // bucket 1 - }, - PositiveDelta: []int64{2}, // counts: [2] at bucket 1 + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hd := &HistogramData{} + for _, h := range tt.histograms { + hd.AddHistogram(h) + } + tt.validate(t, hd) + }) } - - hd.AddHistogram(histo1) - hd.AddHistogram(histo2) - - require.Equal(t, uint64(15), hd.sampleCount) // 10 + 5 - require.Equal(t, 150.0, hd.sampleSum) // 100 + 50 - require.Equal(t, uint64(3), hd.ZeroCount) // 2 + 1 - require.Equal(t, int64(5), hd.PositiveBuckets[0]) - require.Equal(t, int64(10), hd.PositiveBuckets[1]) // 8 + 2 } // TestMakeBucketsFromMap tests the conversion from bucket map to spans and deltas @@ -1468,60 +1470,143 @@ func TestMakeBucketsFromMap(t *testing.T) { } } -// TestMergeHistogram tests the mergeHistogram function +// TestMergeHistogram tests the mergeHistogram function with various histogram format combinations func TestMergeHistogram(t *testing.T) { schema := int32(0) zt := 0.001 - // First metric with native histogram - m1 := &dto.Metric{ - Histogram: &dto.Histogram{ - Schema: &schema, - ZeroThreshold: &zt, - SampleCount: proto.Uint64(10), - SampleSum: proto.Float64(100.0), - ZeroCount: proto.Uint64(2), - PositiveSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + tests := []struct { + name string + m1 *dto.Metric + m2 *dto.Metric + validate func(t *testing.T, h *dto.Histogram) + }{ + { + name: "dual-mode + dual-mode", + m1: &dto.Metric{ + Histogram: &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{5}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + }, + }, + }, + m2: &dto.Metric{ + Histogram: &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(5), + SampleSum: proto.Float64(50.0), + ZeroCount: proto.Uint64(1), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{3}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(3)}, + }, + }, }, - PositiveDelta: []int64{5}, - Bucket: []*dto.Bucket{ - {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + validate: func(t *testing.T, h *dto.Histogram) { + require.Equal(t, uint64(15), h.GetSampleCount()) + require.Equal(t, 150.0, h.GetSampleSum()) + require.Equal(t, uint64(3), h.GetZeroCount()) + require.Equal(t, 1, len(h.Bucket)) + require.Equal(t, uint64(8), h.Bucket[0].GetCumulativeCount()) + posCounts := deltasToCountsInt(h.GetPositiveDelta()) + require.Equal(t, 1, len(posCounts)) + require.Equal(t, int64(8), posCounts[0]) }, }, - } - - // Second metric with native histogram - m2 := &dto.Metric{ - Histogram: &dto.Histogram{ - Schema: &schema, - ZeroThreshold: &zt, - SampleCount: proto.Uint64(5), - SampleSum: proto.Float64(50.0), - ZeroCount: proto.Uint64(1), - PositiveSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + { + name: "native + classic", + m1: &dto.Metric{ + Histogram: &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{5}, + }, + }, + m2: &dto.Metric{ + Histogram: &dto.Histogram{ + SampleCount: proto.Uint64(5), + SampleSum: proto.Float64(50.0), + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(3)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(5)}, + }, + }, + }, + validate: func(t *testing.T, h *dto.Histogram) { + require.Equal(t, uint64(15), h.GetSampleCount()) + require.Equal(t, 150.0, h.GetSampleSum()) + require.Equal(t, int32(0), h.GetSchema()) + require.Equal(t, uint64(2), h.GetZeroCount()) + posCounts := deltasToCountsInt(h.GetPositiveDelta()) + require.Equal(t, int64(5), posCounts[0]) + require.Equal(t, 2, len(h.Bucket)) + require.Equal(t, 1.0, h.Bucket[0].GetUpperBound()) + require.Equal(t, uint64(3), h.Bucket[0].GetCumulativeCount()) + require.Equal(t, 5.0, h.Bucket[1].GetUpperBound()) + require.Equal(t, uint64(5), h.Bucket[1].GetCumulativeCount()) + }, + }, + { + name: "classic + classic", + m1: &dto.Metric{ + Histogram: &dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, + }, + }, + }, + m2: &dto.Metric{ + Histogram: &dto.Histogram{ + SampleCount: proto.Uint64(5), + SampleSum: proto.Float64(50.0), + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(3)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(4)}, + }, + }, }, - PositiveDelta: []int64{3}, - Bucket: []*dto.Bucket{ - {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(3)}, + validate: func(t *testing.T, h *dto.Histogram) { + require.Equal(t, uint64(15), h.GetSampleCount()) + require.Equal(t, 150.0, h.GetSampleSum()) + require.Nil(t, h.Schema) + require.Equal(t, uint64(0), h.GetZeroCount()) + require.Equal(t, 2, len(h.Bucket)) + bucketCounts := make(map[float64]uint64) + for _, b := range h.Bucket { + bucketCounts[b.GetUpperBound()] = b.GetCumulativeCount() + } + require.Equal(t, uint64(8), bucketCounts[1.0]) + require.Equal(t, uint64(12), bucketCounts[5.0]) }, }, } - mergeHistogram(m1, m2) - - h := m1.Histogram - require.Equal(t, uint64(15), h.GetSampleCount()) // 10 + 5 - require.Equal(t, 150.0, h.GetSampleSum()) // 100 + 50 - require.Equal(t, uint64(3), h.GetZeroCount()) // 2 + 1 - - // Check classic buckets merged - require.Equal(t, 1, len(h.Bucket)) - require.Equal(t, uint64(8), h.Bucket[0].GetCumulativeCount()) // 5 + 3 - - // Check native buckets merged (should be 5 + 3 = 8) - posCounts := deltasToCountsInt(h.GetPositiveDelta()) - require.Equal(t, 1, len(posCounts)) - require.Equal(t, int64(8), posCounts[0]) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mergeHistogram(tt.m1, tt.m2) + tt.validate(t, tt.m1.Histogram) + }) + } } From c62f5c6bfc4afff66d138e1953f495db3847c432 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Wed, 25 Mar 2026 22:28:58 +0000 Subject: [PATCH 5/5] fixing tests, lint Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 2 +- ...> scrape_native_histogram_metrics_test.go} | 90 ++++++++++++++----- 2 files changed, 68 insertions(+), 24 deletions(-) rename integration/{dual_mode_metrics_test.go => scrape_native_histogram_metrics_test.go} (63%) diff --git a/CHANGELOG.md b/CHANGELOG.md index c04426b4be8..52e44b86fd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## master / unreleased -* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 +* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 * [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355 diff --git a/integration/dual_mode_metrics_test.go b/integration/scrape_native_histogram_metrics_test.go similarity index 63% rename from integration/dual_mode_metrics_test.go rename to integration/scrape_native_histogram_metrics_test.go index b05a47cb738..72937b36b46 100644 --- a/integration/dual_mode_metrics_test.go +++ b/integration/scrape_native_histogram_metrics_test.go @@ -5,7 +5,6 @@ package integration import ( "fmt" "io" - "math" "net/http" "testing" "time" @@ -20,13 +19,19 @@ import ( "github.com/cortexproject/cortex/integration/e2ecortex" ) -func scrapeMetricsProtobuf(endpoint string) (map[string]*io_prometheus_client.MetricFamily, error) { +func scrapeMetrics(endpoint string, useProtobuf bool) (map[string]*io_prometheus_client.MetricFamily, error) { req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return nil, err } - req.Header.Set("Accept", "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited") + var format expfmt.Format + if useProtobuf { + req.Header.Set("Accept", "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited") + format = expfmt.NewFormat(expfmt.TypeProtoDelim) + } else { + format = expfmt.NewFormat(expfmt.TypeTextPlain) + } client := &http.Client{Timeout: 5 * time.Second} resp, err := client.Do(req) @@ -40,7 +45,7 @@ func scrapeMetricsProtobuf(endpoint string) (map[string]*io_prometheus_client.Me } families := make(map[string]*io_prometheus_client.MetricFamily) - decoder := expfmt.NewDecoder(resp.Body, expfmt.FmtProtoDelim) + decoder := expfmt.NewDecoder(resp.Body, format) for { mf := &io_prometheus_client.MetricFamily{} @@ -57,12 +62,9 @@ func scrapeMetricsProtobuf(endpoint string) (map[string]*io_prometheus_client.Me return families, nil } -// TestDualModeHistogramExposition validates cortex_ingester_tsdb_compaction_duration_seconds -// is exposed in dual mode with both classic buckets and native histogram fields. -func TestDualModeHistogramExposition(t *testing.T) { +func setupCortexWithNativeHistograms(t *testing.T) (*e2e.Scenario, *e2ecortex.CortexService) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) - defer s.Close() consul := e2edb.NewConsulWithName("consul") require.NoError(t, s.StartAndWaitReady(consul)) @@ -85,7 +87,6 @@ func TestDualModeHistogramExposition(t *testing.T) { cortex := e2ecortex.NewSingleBinary("cortex", flags, "") require.NoError(t, s.StartAndWaitReady(cortex)) - require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") @@ -111,7 +112,14 @@ func TestDualModeHistogramExposition(t *testing.T) { require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(100), "cortex_ingester_ingested_samples_total")) time.Sleep(5 * time.Second) - families, err := scrapeMetricsProtobuf(fmt.Sprintf("http://%s/metrics", cortex.HTTPEndpoint())) + return s, cortex +} + +func TestNativeHistogramExposition(t *testing.T) { + s, cortex := setupCortexWithNativeHistograms(t) + defer s.Close() + + families, err := scrapeMetrics(fmt.Sprintf("http://%s/metrics", cortex.HTTPEndpoint()), true) require.NoError(t, err) histFamily, ok := families["cortex_ingester_tsdb_compaction_duration_seconds"] @@ -121,27 +129,63 @@ func TestDualModeHistogramExposition(t *testing.T) { metrics := histFamily.GetMetric() require.NotEmpty(t, metrics) - expectedBuckets := []float64{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, math.Inf(1)} - for _, metric := range metrics { h := metric.GetHistogram() require.NotNil(t, h) - buckets := h.GetBucket() - require.Equal(t, 14, len(buckets)) // testing classic histogram custom buckets - - for i, bucket := range buckets { - require.Equal(t, expectedBuckets[i], bucket.GetUpperBound()) // testing classic histogram custom bucket boundaries - } - - // Testing Native histogram fields require.Equal(t, int32(3), h.GetSchema()) require.NotNil(t, h.GetZeroThreshold()) require.Equal(t, uint64(0), h.GetZeroCount()) sampleCount := h.GetSampleCount() - sampleSum := h.GetSampleSum() - require.Greater(t, sampleSum, 0.0) - require.Equal(t, sampleCount, buckets[len(buckets)-1].GetCumulativeCount()) + require.Greater(t, sampleCount, uint64(0)) + require.Greater(t, h.GetSampleSum(), 0.0) + + posSpans := h.GetPositiveSpan() + require.NotEmpty(t, posSpans) + posDeltas := h.GetPositiveDelta() + require.NotEmpty(t, posDeltas) + + var posCount uint64 + var count int64 + for _, delta := range posDeltas { + count += delta + posCount += uint64(count) + } + require.Equal(t, sampleCount, posCount) + + negSpans := h.GetNegativeSpan() + require.Empty(t, negSpans) + require.Empty(t, h.GetNegativeDelta()) + } +} + +func TestClassicHistogramExposition(t *testing.T) { + s, cortex := setupCortexWithNativeHistograms(t) + defer s.Close() + + families, err := scrapeMetrics(fmt.Sprintf("http://%s/metrics", cortex.HTTPEndpoint()), false) + require.NoError(t, err) + + histFamily, ok := families["cortex_ingester_tsdb_compaction_duration_seconds"] + require.True(t, ok) + require.Equal(t, io_prometheus_client.MetricType_HISTOGRAM, histFamily.GetType()) + + metrics := histFamily.GetMetric() + require.NotEmpty(t, metrics) + + for _, metric := range metrics { + h := metric.GetHistogram() + require.NotNil(t, h) + + buckets := h.GetBucket() + require.NotEmpty(t, buckets) + + sampleCount := h.GetSampleCount() + require.Greater(t, sampleCount, uint64(0)) + require.Greater(t, h.GetSampleSum(), 0.0) + + lastBucket := buckets[len(buckets)-1] + require.Equal(t, sampleCount, lastBucket.GetCumulativeCount()) } }