diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a9c2e68ec..71c02300f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ * [ENHANCEMENT] Ring: Cache `ShuffleShardWithLookback` subrings. The cached entry is invalidated on topology change or once `now` reaches the earliest `RegisteredTimestamp + lookbackPeriod` of any included instance. #7628 * [ENHANCEMENT] Query Frontend: Rename `time_taken` field to `time_taken_ms` and make it return millisecond count. #7649 * [ENHANCEMENT] Update prometheus alertmanager version to v0.33.0. #7647 +* [ENHANCEMENT] Compactor: Reduce object storage GET calls when updating the bucket index by skipping re-reading parquet converter markers for blocks that already have a valid-version parquet entry in the previous index. #7669 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 42f5fb1620..f02ca62b64 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -303,6 +303,12 @@ func (w *Updater) updateParquetBlocks(ctx context.Context, blocks []*Block) erro // Check if parquet mark has been uploaded or deleted for the block. for _, m := range blocks { if _, ok := discoveredParquetBlocks[m.ID]; ok { + // A marker at a valid version is never rewritten (the converter won't + // re-convert the block), so skip re-reading it to save an object storage + // GET. A version that later becomes invalid is re-read to pick up changes. + if m.Parquet != nil && parquet.ValidConverterMarkVersion(m.Parquet.Version) { + continue + } if err := w.updateParquetBlockIndexEntry(ctx, m.ID, m); err != nil { return err } diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index d9b0d559fb..1b1116cf32 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -497,6 +497,83 @@ func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) { } } +func TestUpdater_UpdateParquetBlocks_SkipsReadingExistingMarker(t *testing.T) { + const userID = "user-1" + + ctx := context.Background() + logger := log.NewNopLogger() + + fsBkt, _ := testutil.PrepareFilesystemBucket(t) + + // Generate the initial index with a block that has a valid parquet marker. + gBkt := BucketWithGlobalMarkers(fsBkt) + block1 := testutil.MockStorageBlock(t, gBkt, userID, 10, 20) + block1Mark := testutil.MockStorageParquetConverterMark(t, gBkt, userID, block1, 3) + + w := NewUpdater(gBkt, userID, nil, logger).EnableParquet() + idx, _, _, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqualWithParquet(t, idx, gBkt, userID, + []tsdb.BlockMeta{block1}, + nil, map[string]*parquet.ConverterMarkMeta{ + block1.ULID.String(): {Version: block1Mark.Version, Shards: block1Mark.Shards}, + }) + + // Wrap the bucket so that any GET on the block's parquet marker fails. Since + // the marker is already present and valid in the previous index, the updater + // must skip re-reading it and therefore must not perform any GET. + markerPath := path.Join(userID, block1.ULID.String(), parquet.ConverterMarkerFileName) + mock := &testutil.MockBucketFailure{ + Bucket: fsBkt, + GetFailures: map[string]error{markerPath: errors.New("unexpected GET on parquet marker")}, + } + iMock := objstore.WrapWithMetrics(mock, nil, "") + wFail := NewUpdater(BucketWithGlobalMarkers(iMock), userID, nil, logger).EnableParquet() + + idx, _, _, err = wFail.UpdateIndex(ctx, idx) + require.NoError(t, err) + // No GET should have been performed since the marker read was skipped. + assert.Equal(t, int32(0), mock.GetCalls.Load()) + assertBucketIndexEqualWithParquet(t, idx, gBkt, userID, + []tsdb.BlockMeta{block1}, + nil, map[string]*parquet.ConverterMarkMeta{ + block1.ULID.String(): {Version: block1Mark.Version, Shards: block1Mark.Shards}, + }) +} + +func TestUpdater_UpdateParquetBlocks_ResetsFieldWhenMarkerRemoved(t *testing.T) { + const userID = "user-1" + + ctx := context.Background() + logger := log.NewNopLogger() + + fsBkt, _ := testutil.PrepareFilesystemBucket(t) + + // Generate the initial index with a block that has a valid parquet marker. + gBkt := BucketWithGlobalMarkers(fsBkt) + block1 := testutil.MockStorageBlock(t, gBkt, userID, 10, 20) + block1Mark := testutil.MockStorageParquetConverterMark(t, gBkt, userID, block1, 3) + + w := NewUpdater(gBkt, userID, nil, logger).EnableParquet() + idx, _, _, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqualWithParquet(t, idx, gBkt, userID, + []tsdb.BlockMeta{block1}, + nil, map[string]*parquet.ConverterMarkMeta{ + block1.ULID.String(): {Version: block1Mark.Version, Shards: block1Mark.Shards}, + }) + + // Remove the parquet marker (both the per-block and global location) and + // update the index. The parquet field must be reset to nil. + require.NoError(t, gBkt.Delete(ctx, path.Join(userID, block1.ULID.String(), parquet.ConverterMarkerFileName))) + + idx, _, _, err = w.UpdateIndex(ctx, idx) + require.NoError(t, err) + assertBucketIndexEqualWithParquet(t, idx, gBkt, userID, + []tsdb.BlockMeta{block1}, + nil, map[string]*parquet.ConverterMarkMeta{}) +} + func getBlockUploadedAt(t testing.TB, bkt objstore.Bucket, userID string, blockID ulid.ULID) int64 { metaFile := path.Join(userID, blockID.String(), block.MetaFilename)