From f04cbc8b1c204d09c9b81084a4a62db5bdf7c33e Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Fri, 3 Jul 2026 15:42:39 -0700 Subject: [PATCH 1/3] Detach ingester series from gRPC buffers to reduce heap Signed-off-by: Justin Jung --- pkg/querier/distributor_queryable.go | 16 +++++- pkg/querier/distributor_queryable_test.go | 68 +++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 263558e9b6..fe95bc924c 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -186,7 +186,10 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa continue } - ls := cortexpb.FromLabelAdaptersToLabels(result.Labels) + // Detach label and chunk data from gRPC unmarshal buffers so the Go GC + // can reclaim receive buffers and reduce heap usage. + ls := cortexpb.FromLabelAdaptersToLabelsWithCopy(result.Labels) + detachChunksFromBuffer(result.Chunks) chunks, err := chunkcompat.FromChunks(ls, result.Chunks) if err != nil { @@ -449,3 +452,14 @@ func labelHintsToSelectHints(hints *storage.LabelHints) *storage.SelectHints { Limit: hints.Limit, } } + +// detachChunksFromBuffer re-allocates chunk data byte slices so that +// the series no longer references the gRPC unmarshal buffer, allowing +// the Go GC to reclaim receive buffers and reduce heap usage. +func detachChunksFromBuffer(chunks []client.Chunk) { + for i := range chunks { + if len(chunks[i].Data) > 0 { + chunks[i].Data = append([]byte(nil), chunks[i].Data...) + } + } +} \ No newline at end of file diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 74a6f84ca9..b6afb34986 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -642,3 +642,71 @@ func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) { }) } } + +func BenchmarkIngesterStreamingSelect(b *testing.B) { + // Simulate a realistic ingester response: 100 series, each with labels and chunk data + // that reference a shared backing buffer (mimicking gRPC unmarshal behavior). + // This benchmark measures the allocations in the streamingSelect hot path. + const numSeries = 100 + const chunkDataSize = 1024 + + // Build a single large buffer to simulate the gRPC receive buffer. + // All label strings and chunk data will be slices into this buffer. + bufSize := numSeries * (chunkDataSize + 200) // 200 bytes for label strings per series + buf := make([]byte, bufSize) + for i := range buf { + buf[i] = byte(i % 256) + } + + buildResponse := func() *client.QueryStreamResponse { + offset := 0 + series := make([]client.TimeSeriesChunk, numSeries) + for i := range series { + // Labels that reference the shared buffer (simulating protobuf unmarshal) + nameStr := string(buf[offset : offset+10]) + offset += 10 + valueStr := string(buf[offset : offset+20]) + offset += 20 + + // Chunk data that references the shared buffer + chunkData := buf[offset : offset+chunkDataSize] + offset += chunkDataSize + + series[i] = client.TimeSeriesChunk{ + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: nameStr}, + {Name: "instance", Value: valueStr}, + }, + Chunks: []client.Chunk{ + { + StartTimestampMs: 0, + EndTimestampMs: 1000, + Data: chunkData, + }, + }, + } + } + return &client.QueryStreamResponse{Chunkseries: series} + } + + b.Run("with_detach", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + resp := buildResponse() + for _, result := range resp.Chunkseries { + _ = cortexpb.FromLabelAdaptersToLabelsWithCopy(result.Labels) + detachChunksFromBuffer(result.Chunks) + } + } + }) + + b.Run("without_detach", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + resp := buildResponse() + for _, result := range resp.Chunkseries { + _ = cortexpb.FromLabelAdaptersToLabels(result.Labels) + } + } + }) +} From dd2b484ad7db55250cb8af438c60761c92a9035f Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Fri, 3 Jul 2026 15:51:08 -0700 Subject: [PATCH 2/3] Add changelog Signed-off-by: Justin Jung --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a9c2e68ec..1f41e994fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ * [ENHANCEMENT] Ring: Cache `ShuffleShardWithLookback` subrings. The cached entry is invalidated on topology change or once `now` reaches the earliest `RegisteredTimestamp + lookbackPeriod` of any included instance. #7628 * [ENHANCEMENT] Query Frontend: Rename `time_taken` field to `time_taken_ms` and make it return millisecond count. #7649 * [ENHANCEMENT] Update prometheus alertmanager version to v0.33.0. #7647 +* [ENHANCEMENT] Querier/Ingester: Detach ingester series from gRPC buffers to reduce heap. #7670 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 From 4c7ab6742e48892ad0ba7bb99d71de1c21833c54 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Fri, 3 Jul 2026 16:17:49 -0700 Subject: [PATCH 3/3] Lint Signed-off-by: Justin Jung --- pkg/querier/distributor_queryable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index fe95bc924c..8dc617b8a3 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -462,4 +462,4 @@ func detachChunksFromBuffer(chunks []client.Chunk) { chunks[i].Data = append([]byte(nil), chunks[i].Data...) } } -} \ No newline at end of file +}