Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [ENHANCEMENT] Ring: Cache `ShuffleShardWithLookback` subrings. The cached entry is invalidated on topology change or once `now` reaches the earliest `RegisteredTimestamp + lookbackPeriod` of any included instance. #7628
* [ENHANCEMENT] Query Frontend: Rename `time_taken` field to `time_taken_ms` and make it return millisecond count. #7649
* [ENHANCEMENT] Update prometheus alertmanager version to v0.33.0. #7647
* [ENHANCEMENT] Compactor: Reduce object storage GET calls when updating the bucket index by skipping re-reading parquet converter markers for blocks that already have a valid-version parquet entry in the previous index. #7669
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/tsdb/bucketindex/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ func (w *Updater) updateParquetBlocks(ctx context.Context, blocks []*Block) erro
// Check if parquet mark has been uploaded or deleted for the block.
for _, m := range blocks {
if _, ok := discoveredParquetBlocks[m.ID]; ok {
// A marker at a valid version is never rewritten (the converter won't
// re-convert the block), so skip re-reading it to save an object storage
// GET. A version that later becomes invalid is re-read to pick up changes.
if m.Parquet != nil && parquet.ValidConverterMarkVersion(m.Parquet.Version) {
continue
}
if err := w.updateParquetBlockIndexEntry(ctx, m.ID, m); err != nil {
return err
}
Expand Down
77 changes: 77 additions & 0 deletions pkg/storage/tsdb/bucketindex/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,83 @@ func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) {
}
}

func TestUpdater_UpdateParquetBlocks_SkipsReadingExistingMarker(t *testing.T) {
const userID = "user-1"

ctx := context.Background()
logger := log.NewNopLogger()

fsBkt, _ := testutil.PrepareFilesystemBucket(t)

// Generate the initial index with a block that has a valid parquet marker.
gBkt := BucketWithGlobalMarkers(fsBkt)
block1 := testutil.MockStorageBlock(t, gBkt, userID, 10, 20)
block1Mark := testutil.MockStorageParquetConverterMark(t, gBkt, userID, block1, 3)

w := NewUpdater(gBkt, userID, nil, logger).EnableParquet()
idx, _, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, idx, gBkt, userID,
[]tsdb.BlockMeta{block1},
nil, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1Mark.Version, Shards: block1Mark.Shards},
})

// Wrap the bucket so that any GET on the block's parquet marker fails. Since
// the marker is already present and valid in the previous index, the updater
// must skip re-reading it and therefore must not perform any GET.
markerPath := path.Join(userID, block1.ULID.String(), parquet.ConverterMarkerFileName)
mock := &testutil.MockBucketFailure{
Bucket: fsBkt,
GetFailures: map[string]error{markerPath: errors.New("unexpected GET on parquet marker")},
}
iMock := objstore.WrapWithMetrics(mock, nil, "")
wFail := NewUpdater(BucketWithGlobalMarkers(iMock), userID, nil, logger).EnableParquet()

idx, _, _, err = wFail.UpdateIndex(ctx, idx)
require.NoError(t, err)
// No GET should have been performed since the marker read was skipped.
assert.Equal(t, int32(0), mock.GetCalls.Load())
assertBucketIndexEqualWithParquet(t, idx, gBkt, userID,
[]tsdb.BlockMeta{block1},
nil, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1Mark.Version, Shards: block1Mark.Shards},
})
}

func TestUpdater_UpdateParquetBlocks_ResetsFieldWhenMarkerRemoved(t *testing.T) {
const userID = "user-1"

ctx := context.Background()
logger := log.NewNopLogger()

fsBkt, _ := testutil.PrepareFilesystemBucket(t)

// Generate the initial index with a block that has a valid parquet marker.
gBkt := BucketWithGlobalMarkers(fsBkt)
block1 := testutil.MockStorageBlock(t, gBkt, userID, 10, 20)
block1Mark := testutil.MockStorageParquetConverterMark(t, gBkt, userID, block1, 3)

w := NewUpdater(gBkt, userID, nil, logger).EnableParquet()
idx, _, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, idx, gBkt, userID,
[]tsdb.BlockMeta{block1},
nil, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1Mark.Version, Shards: block1Mark.Shards},
})

// Remove the parquet marker (both the per-block and global location) and
// update the index. The parquet field must be reset to nil.
require.NoError(t, gBkt.Delete(ctx, path.Join(userID, block1.ULID.String(), parquet.ConverterMarkerFileName)))

idx, _, _, err = w.UpdateIndex(ctx, idx)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, idx, gBkt, userID,
[]tsdb.BlockMeta{block1},
nil, map[string]*parquet.ConverterMarkMeta{})
}

func getBlockUploadedAt(t testing.TB, bkt objstore.Bucket, userID string, blockID ulid.ULID) int64 {
metaFile := path.Join(userID, blockID.String(), block.MetaFilename)

Expand Down
Loading