Skip to content

Commit 0772991

Browse files
authored
Copy thanos shipper (#1957)
* Copy shipper from Thanos. * Remove support for uploading compacted blocks. * Always allow out-of-order uploads. Removed unused overlap checker. * Rename Shipper interface to BlocksUploader, and ThanosShipper to Shipper. * Extract readShippedBlocks method from user_tsdb.go * Added shipper unit tests (copied and adapted from original tests) * Add faillint rule to avoid using Thanos shipper. Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
1 parent d6de837 commit 0772991

File tree

6 files changed

+470
-29
lines changed

6 files changed

+470
-29
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ lint: check-makefiles
291291
faillint -paths "github.com/thanos-io/thanos/pkg/block.{NewIgnoreDeletionMarkFilter}" \
292292
./pkg/compactor/...
293293

294+
faillint -paths "github.com/thanos-io/thanos/pkg/shipper.{New}" ./pkg/...
295+
294296
# We've copied github.com/NYTimes/gziphandler to pkg/util/gziphandler
295297
# at least until https://github.com/nytimes/gziphandler/pull/112 is merged
296298
faillint -paths "github.com/NYTimes/gziphandler" \

pkg/ingester/ingester.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import (
4141
"github.com/prometheus/prometheus/tsdb/hashcache"
4242
"github.com/thanos-io/thanos/pkg/block/metadata"
4343
"github.com/thanos-io/thanos/pkg/objstore"
44-
"github.com/thanos-io/thanos/pkg/shipper"
4544
"github.com/weaveworks/common/httpgrpc"
4645
"go.uber.org/atomic"
4746
"golang.org/x/sync/errgroup"
@@ -95,8 +94,8 @@ var (
9594
errExemplarRef = errors.New("exemplars not ingested because series not already present")
9695
)
9796

98-
// Shipper interface is used to have an easy way to mock it in tests.
99-
type Shipper interface {
97+
// BlocksUploader interface is used to have an easy way to mock it in tests.
98+
type BlocksUploader interface {
10099
Sync(ctx context.Context) (uploaded int, err error)
101100
}
102101

@@ -1522,15 +1521,13 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
15221521

15231522
// Create a new shipper for this database
15241523
if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() {
1525-
userDB.shipper = shipper.New(
1524+
userDB.shipper = NewShipper(
15261525
userLogger,
15271526
tsdbPromReg,
15281527
udir,
15291528
bucket.NewUserBucketClient(userID, i.bucket, i.limits),
15301529
func() labels.Labels { return l },
15311530
metadata.ReceiveSource,
1532-
false, // No need to upload compacted blocks. Mimir compactor takes care of that.
1533-
true, // Allow out of order uploads. It's fine in Mimir's context.
15341531
metadata.NoneFunc,
15351532
)
15361533

pkg/ingester/ingester_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3043,13 +3043,13 @@ func TestIngester_shipBlocks(t *testing.T) {
30433043
})
30443044

30453045
// Create the TSDB for 3 users and then replace the shipper with the mocked one
3046-
mocks := []*shipperMock{}
3046+
mocks := []*uploaderMock{}
30473047
for _, userID := range []string{"user-1", "user-2", "user-3"} {
30483048
userDB, err := i.getOrCreateTSDB(userID, false)
30493049
require.NoError(t, err)
30503050
require.NotNil(t, userDB)
30513051

3052-
m := &shipperMock{}
3052+
m := &uploaderMock{}
30533053
m.On("Sync", mock.Anything).Return(0, nil)
30543054
mocks = append(mocks, m)
30553055

@@ -3293,12 +3293,12 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
32933293
require.NotNil(t, db)
32943294
}
32953295

3296-
type shipperMock struct {
3296+
type uploaderMock struct {
32973297
mock.Mock
32983298
}
32993299

3300-
// Sync mocks Shipper.Sync()
3301-
func (m *shipperMock) Sync(ctx context.Context) (uploaded int, err error) {
3300+
// Sync mocks BlocksUploader.Sync()
3301+
func (m *uploaderMock) Sync(ctx context.Context) (uploaded int, err error) {
33023302
args := m.Called(ctx)
33033303
return args.Int(0), args.Error(1)
33043304
}
@@ -3619,8 +3619,8 @@ func TestIngester_ForFlush(t *testing.T) {
36193619
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), i))
36203620
}
36213621

3622-
func mockUserShipper(t *testing.T, i *Ingester) *shipperMock {
3623-
m := &shipperMock{}
3622+
func mockUserShipper(t *testing.T, i *Ingester) *uploaderMock {
3623+
m := &uploaderMock{}
36243624
userDB, err := i.getOrCreateTSDB(userID, false)
36253625
require.NoError(t, err)
36263626
require.NotNil(t, userDB)

pkg/ingester/shipper.go

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/shipper/shipper.go
3+
// Provenance-includes-license: Apache-2.0
4+
// Provenance-includes-copyright: The Thanos Authors.
5+
6+
package ingester
7+
8+
import (
9+
"context"
10+
"io/ioutil"
11+
"os"
12+
"path"
13+
"path/filepath"
14+
"sort"
15+
16+
"github.com/go-kit/log"
17+
"github.com/go-kit/log/level"
18+
"github.com/oklog/ulid"
19+
"github.com/pkg/errors"
20+
"github.com/prometheus/client_golang/prometheus"
21+
"github.com/prometheus/client_golang/prometheus/promauto"
22+
"github.com/prometheus/prometheus/model/labels"
23+
"github.com/thanos-io/thanos/pkg/block"
24+
"github.com/thanos-io/thanos/pkg/block/metadata"
25+
"github.com/thanos-io/thanos/pkg/objstore"
26+
"github.com/thanos-io/thanos/pkg/shipper"
27+
)
28+
29+
type metrics struct {
30+
dirSyncs prometheus.Counter
31+
dirSyncFailures prometheus.Counter
32+
uploads prometheus.Counter
33+
uploadFailures prometheus.Counter
34+
}
35+
36+
func newMetrics(reg prometheus.Registerer) *metrics {
37+
var m metrics
38+
39+
m.dirSyncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
40+
Name: "thanos_shipper_dir_syncs_total",
41+
Help: "Total number of dir syncs",
42+
})
43+
m.dirSyncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
44+
Name: "thanos_shipper_dir_sync_failures_total",
45+
Help: "Total number of failed dir syncs",
46+
})
47+
m.uploads = promauto.With(reg).NewCounter(prometheus.CounterOpts{
48+
Name: "thanos_shipper_uploads_total",
49+
Help: "Total number of uploaded blocks",
50+
})
51+
m.uploadFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
52+
Name: "thanos_shipper_upload_failures_total",
53+
Help: "Total number of block upload failures",
54+
})
55+
return &m
56+
}
57+
58+
// Shipper watches a directory for matching files and directories and uploads
59+
// them to a remote data store.
60+
// Shipper implements BlocksUploader interface.
61+
type Shipper struct {
62+
logger log.Logger
63+
dir string
64+
metrics *metrics
65+
bucket objstore.Bucket
66+
labels func() labels.Labels
67+
source metadata.SourceType
68+
69+
hashFunc metadata.HashFunc
70+
}
71+
72+
// NewShipper creates a new uploader that detects new TSDB blocks in dir and uploads them to
73+
// remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
74+
// If uploadCompacted is enabled, it also uploads compacted blocks which are already in filesystem.
75+
func NewShipper(
76+
logger log.Logger,
77+
r prometheus.Registerer,
78+
dir string,
79+
bucket objstore.Bucket,
80+
lbls func() labels.Labels,
81+
source metadata.SourceType,
82+
hashFunc metadata.HashFunc,
83+
) *Shipper {
84+
if logger == nil {
85+
logger = log.NewNopLogger()
86+
}
87+
if lbls == nil {
88+
lbls = func() labels.Labels { return nil }
89+
}
90+
91+
return &Shipper{
92+
logger: logger,
93+
dir: dir,
94+
bucket: bucket,
95+
labels: lbls,
96+
metrics: newMetrics(r),
97+
source: source,
98+
hashFunc: hashFunc,
99+
}
100+
}
101+
102+
// Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded
103+
// to the object bucket once.
104+
//
105+
// If uploaded.
106+
//
107+
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
108+
func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
109+
meta, err := shipper.ReadMetaFile(s.dir)
110+
if err != nil {
111+
// If we encounter any error, proceed with an empty meta file and overwrite it later.
112+
// The meta file is only used to avoid unnecessary bucket.Exists call,
113+
// which are properly handled by the system if their occur anyway.
114+
if !os.IsNotExist(err) {
115+
level.Warn(s.logger).Log("msg", "reading meta file failed, will override it", "err", err)
116+
}
117+
meta = &shipper.Meta{Version: shipper.MetaVersion1}
118+
}
119+
120+
// Build a map of blocks we already uploaded.
121+
hasUploaded := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
122+
for _, id := range meta.Uploaded {
123+
hasUploaded[id] = struct{}{}
124+
}
125+
126+
// Reset the uploaded slice so we can rebuild it only with blocks that still exist locally.
127+
meta.Uploaded = nil
128+
129+
var uploadErrs int
130+
131+
metas, err := s.blockMetasFromOldest()
132+
if err != nil {
133+
return 0, err
134+
}
135+
for _, m := range metas {
136+
// Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket,
137+
// it was generally removed by the compaction process.
138+
if _, uploaded := hasUploaded[m.ULID]; uploaded {
139+
meta.Uploaded = append(meta.Uploaded, m.ULID)
140+
continue
141+
}
142+
143+
if m.Stats.NumSamples == 0 {
144+
// Ignore empty blocks.
145+
level.Debug(s.logger).Log("msg", "ignoring empty block", "block", m.ULID)
146+
continue
147+
}
148+
149+
// We only ship of the first compacted block level as normal flow.
150+
if m.Compaction.Level > 1 {
151+
continue
152+
}
153+
154+
// Check against bucket if the meta file for this block exists.
155+
ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename))
156+
if err != nil {
157+
return 0, errors.Wrap(err, "check exists")
158+
}
159+
if ok {
160+
meta.Uploaded = append(meta.Uploaded, m.ULID)
161+
continue
162+
}
163+
164+
if err := s.upload(ctx, m); err != nil {
165+
// No error returned, just log line. This is because we want other blocks to be uploaded even
166+
// though this one failed. It will be retried on second Sync iteration.
167+
level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err)
168+
uploadErrs++
169+
continue
170+
}
171+
meta.Uploaded = append(meta.Uploaded, m.ULID)
172+
uploaded++
173+
s.metrics.uploads.Inc()
174+
}
175+
if err := shipper.WriteMetaFile(s.logger, s.dir, meta); err != nil {
176+
level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err)
177+
}
178+
179+
s.metrics.dirSyncs.Inc()
180+
if uploadErrs > 0 {
181+
s.metrics.uploadFailures.Add(float64(uploadErrs))
182+
return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs)
183+
}
184+
185+
return uploaded, nil
186+
}
187+
188+
// sync uploads the block if not exists in remote storage.
189+
// TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error.
190+
func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
191+
level.Info(s.logger).Log("msg", "upload new block", "id", meta.ULID)
192+
193+
// We hard-link the files into a temporary upload directory so we are not affected
194+
// by other operations happening against the TSDB directory.
195+
updir := filepath.Join(s.dir, "thanos", "upload", meta.ULID.String())
196+
197+
// Remove updir just in case.
198+
if err := os.RemoveAll(updir); err != nil {
199+
return errors.Wrap(err, "clean upload directory")
200+
}
201+
if err := os.MkdirAll(updir, 0750); err != nil {
202+
return errors.Wrap(err, "create upload dir")
203+
}
204+
defer func() {
205+
if err := os.RemoveAll(updir); err != nil {
206+
level.Error(s.logger).Log("msg", "failed to clean upload directory", "err", err)
207+
}
208+
}()
209+
210+
dir := filepath.Join(s.dir, meta.ULID.String())
211+
if err := hardlinkBlock(dir, updir); err != nil {
212+
return errors.Wrap(err, "hard link block")
213+
}
214+
// Attach current labels and write a new meta file with Thanos extensions.
215+
if lset := s.labels(); lset != nil {
216+
meta.Thanos.Labels = lset.Map()
217+
}
218+
meta.Thanos.Source = s.source
219+
meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir)
220+
if err := meta.WriteToDir(s.logger, updir); err != nil {
221+
return errors.Wrap(err, "write meta file")
222+
}
223+
return block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)
224+
}
225+
226+
// blockMetasFromOldest returns the block meta of each block found in dir
227+
// sorted by minTime asc.
228+
func (s *Shipper) blockMetasFromOldest() (metas []*metadata.Meta, _ error) {
229+
fis, err := ioutil.ReadDir(s.dir)
230+
if err != nil {
231+
return nil, errors.Wrap(err, "read dir")
232+
}
233+
names := make([]string, 0, len(fis))
234+
for _, fi := range fis {
235+
names = append(names, fi.Name())
236+
}
237+
for _, n := range names {
238+
if _, ok := block.IsBlockDir(n); !ok {
239+
continue
240+
}
241+
dir := filepath.Join(s.dir, n)
242+
243+
fi, err := os.Stat(dir)
244+
if err != nil {
245+
return nil, errors.Wrapf(err, "stat block %v", dir)
246+
}
247+
if !fi.IsDir() {
248+
continue
249+
}
250+
m, err := metadata.ReadFromDir(dir)
251+
if err != nil {
252+
return nil, errors.Wrapf(err, "read metadata for block %v", dir)
253+
}
254+
metas = append(metas, m)
255+
}
256+
sort.Slice(metas, func(i, j int) bool {
257+
return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime
258+
})
259+
return metas, nil
260+
}
261+
262+
func hardlinkBlock(src, dst string) error {
263+
chunkDir := filepath.Join(dst, block.ChunksDirname)
264+
265+
if err := os.MkdirAll(chunkDir, 0750); err != nil {
266+
return errors.Wrap(err, "create chunks dir")
267+
}
268+
269+
fis, err := ioutil.ReadDir(filepath.Join(src, block.ChunksDirname))
270+
if err != nil {
271+
return errors.Wrap(err, "read chunk dir")
272+
}
273+
files := make([]string, 0, len(fis))
274+
for _, fi := range fis {
275+
files = append(files, fi.Name())
276+
}
277+
for i, fn := range files {
278+
files[i] = filepath.Join(block.ChunksDirname, fn)
279+
}
280+
files = append(files, block.MetaFilename, block.IndexFilename)
281+
282+
for _, fn := range files {
283+
if err := os.Link(filepath.Join(src, fn), filepath.Join(dst, fn)); err != nil {
284+
return errors.Wrapf(err, "hard link file %s", fn)
285+
}
286+
}
287+
return nil
288+
}
289+
290+
func readShippedBlocks(dir string) (map[ulid.ULID]struct{}, error) {
291+
shipperMeta, err := shipper.ReadMetaFile(dir)
292+
if errors.Is(err, os.ErrNotExist) {
293+
// If the meta file doesn't exist it means the shipper hasn't run yet.
294+
shipperMeta = &shipper.Meta{}
295+
} else if err != nil {
296+
return nil, err
297+
}
298+
299+
// Build a map.
300+
shippedBlocks := make(map[ulid.ULID]struct{}, len(shipperMeta.Uploaded))
301+
for _, blockID := range shipperMeta.Uploaded {
302+
shippedBlocks[blockID] = struct{}{}
303+
}
304+
305+
return shippedBlocks, nil
306+
}

0 commit comments

Comments
 (0)