Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [ENHANCEMENT] Ring: Cache `ShuffleShardWithLookback` subrings. The cached entry is invalidated on topology change or once `now` reaches the earliest `RegisteredTimestamp + lookbackPeriod` of any included instance. #7628
* [ENHANCEMENT] Query Frontend: Rename `time_taken` field to `time_taken_ms` and make it return millisecond count. #7649
* [ENHANCEMENT] Update prometheus alertmanager version to v0.33.0. #7647
* [ENHANCEMENT] Querier/Ingester: Detach ingester series from gRPC buffers to reduce heap. #7670
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
16 changes: 15 additions & 1 deletion pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa
continue
}

ls := cortexpb.FromLabelAdaptersToLabels(result.Labels)
// Detach label and chunk data from gRPC unmarshal buffers so the Go GC
// can reclaim receive buffers and reduce heap usage.
ls := cortexpb.FromLabelAdaptersToLabelsWithCopy(result.Labels)
detachChunksFromBuffer(result.Chunks)

chunks, err := chunkcompat.FromChunks(ls, result.Chunks)
if err != nil {
Expand Down Expand Up @@ -449,3 +452,14 @@ func labelHintsToSelectHints(hints *storage.LabelHints) *storage.SelectHints {
Limit: hints.Limit,
}
}

// detachChunksFromBuffer re-allocates chunk data byte slices so that
// the series no longer references the gRPC unmarshal buffer, allowing
// the Go GC to reclaim receive buffers and reduce heap usage.
func detachChunksFromBuffer(chunks []client.Chunk) {
for i := range chunks {
if len(chunks[i].Data) > 0 {
chunks[i].Data = append([]byte(nil), chunks[i].Data...)
}
}
}
68 changes: 68 additions & 0 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,3 +642,71 @@ func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) {
})
}
}

func BenchmarkIngesterStreamingSelect(b *testing.B) {
// Simulate a realistic ingester response: 100 series, each with labels and chunk data
// that reference a shared backing buffer (mimicking gRPC unmarshal behavior).
// This benchmark measures the allocations in the streamingSelect hot path.
const numSeries = 100
const chunkDataSize = 1024

// Build a single large buffer to simulate the gRPC receive buffer.
// All label strings and chunk data will be slices into this buffer.
bufSize := numSeries * (chunkDataSize + 200) // 200 bytes for label strings per series
buf := make([]byte, bufSize)
for i := range buf {
buf[i] = byte(i % 256)
}

buildResponse := func() *client.QueryStreamResponse {
offset := 0
series := make([]client.TimeSeriesChunk, numSeries)
for i := range series {
// Labels that reference the shared buffer (simulating protobuf unmarshal)
nameStr := string(buf[offset : offset+10])
offset += 10
valueStr := string(buf[offset : offset+20])
offset += 20

// Chunk data that references the shared buffer
chunkData := buf[offset : offset+chunkDataSize]
offset += chunkDataSize

series[i] = client.TimeSeriesChunk{
Labels: []cortexpb.LabelAdapter{
{Name: "__name__", Value: nameStr},
{Name: "instance", Value: valueStr},
},
Chunks: []client.Chunk{
{
StartTimestampMs: 0,
EndTimestampMs: 1000,
Data: chunkData,
},
},
}
}
return &client.QueryStreamResponse{Chunkseries: series}
}

b.Run("with_detach", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
resp := buildResponse()
for _, result := range resp.Chunkseries {
_ = cortexpb.FromLabelAdaptersToLabelsWithCopy(result.Labels)
detachChunksFromBuffer(result.Chunks)
}
}
})

b.Run("without_detach", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
resp := buildResponse()
for _, result := range resp.Chunkseries {
_ = cortexpb.FromLabelAdaptersToLabels(result.Labels)
}
}
})
}
Loading