Skip to content

Commit d61a8d8

Browse files
committed
Split reflists to share their contents across snapshots
In current aptly, each repository and snapshot has its own reflist in the database. This brings a few problems with it: - Given a sufficiently large repositories and snapshots, these lists can get enormous, reaching >1MB. This is a problem for LevelDB's overall performance, as it tends to prefer values around the confiruged block size (defaults to just 4KiB). - When you take these large repositories and snapshot them, you have a full, new copy of the reflist, even if only a few packages changed. This means that having a lot of snapshots with a few changes causes the database to basically be full of largely duplicate reflists. - All the duplication also means that many of the same refs are being loaded repeatedly, which can cause some slowdown but, more notably, eats up huge amounts of memory. - Adding on more and more new repositories and snapshots will cause the time and memory spent on things like cleanup and publishing to grow roughly linearly. At the core, there are two problems here: - Reflists get very big because there are just a lot of packages. - Different reflists can tend to duplicate much of the same contents. *Split reflists* aim at solving this by separating reflists into 64 *buckets*. Package refs are sorted into individual buckets according to the following system: - Take the first 3 letters of the package name, after dropping a `lib` prefix. (Using only the first 3 letters will cause packages with similar prefixes to end up in the same bucket, under the assumption that packages with similar names tend to be updated together.) - Take the 64-bit xxhash of these letters. (xxhash was chosen because it relatively good distribution across the individual bits, which is important for the next step.) - Use the first 6 bits of the hash (range [0:63]) as an index into the buckets. Once refs are placed in buckets, a sha256 digest of all the refs in the bucket is taken. These buckets are then stored in the database, split into roughly block-sized segments, and all the repositories and snapshots simply store an array of bucket digests. This approach means that *repositories and snapshots can share their reflist buckets*. If a snapshot is taken of a repository, it will have the same contents, so its split reflist will point to the same buckets as the base repository, and only one copy of each bucket is stored in the database. When some packages in the repository change, only the buckets containing those packages will be modified; all the other buckets will remain unchanged, and thus their contents will still be shared. Later on, when these reflists are loaded, each bucket is only loaded once, short-cutting loaded many megabytes of data. In effect, split reflists are essentially copy-on-write, with only the changed buckets stored individually. Changing the disk format means that a migration needs to take place, so that task is moved into the database cleanup step, which will migrate reflists over to split reflists, as well as delete any unused reflist buckets. All the reflist tests are also changed to additionally test out split reflists; although the internal logic is all shared (since buckets are, themselves, just normal reflists), some special additions are needed to have native versions of the various reflist helper methods. In our tests, we've observed the following improvements: - Memory usage during publish and database cleanup, with `GOMEMLIMIT=2GiB`, goes down from ~3.2GiB (larger than the memory limit!) to ~0.7GiB, a decrease of ~4.5x. - Database size decreases from 1.3GB to 367MB. *In my local tests*, publish times had also decreased down to mere seconds but the same effect wasn't observed on the server, with the times staying around the same. My suspicions are that this is due to I/O performance: my local system is an M1 MBP, which almost certainly has much faster disk speeds than our DigitalOcean block volumes. Split reflists include a side effect of requiring more random accesses from reading all the buckets by their keys, so if your random I/O performance is slower, it might cancel out the benefits. That being said, even in that case, the memory usage and database size advantages still persist. Signed-off-by: Ryan Gonzalez <ryan.gonzalez@collabora.com>
1 parent 8e20b00 commit d61a8d8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1961
-668
lines changed

api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func maybeRunTaskInBackground(c *gin.Context, name string, resources []string, p
181181

182182
// Common piece of code to show list of packages,
183183
// with searching & details if requested
184-
func showPackages(c *gin.Context, reflist *deb.PackageRefList, collectionFactory *deb.CollectionFactory) {
184+
func showPackages(c *gin.Context, reflist deb.AnyRefList, collectionFactory *deb.CollectionFactory) {
185185
result := []*deb.Package{}
186186

187187
list, err := deb.NewPackageListFromRefList(reflist, collectionFactory.PackageCollection(), nil)

api/db.go

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sort"
66

77
"github.com/aptly-dev/aptly/aptly"
8+
"github.com/aptly-dev/aptly/database"
89
"github.com/aptly-dev/aptly/deb"
910
"github.com/aptly-dev/aptly/task"
1011
"github.com/aptly-dev/aptly/utils"
@@ -20,18 +21,22 @@ func apiDbCleanup(c *gin.Context) {
2021

2122
collectionFactory := context.NewCollectionFactory()
2223

23-
// collect information about referenced packages...
24-
existingPackageRefs := deb.NewPackageRefList()
24+
// collect information about referenced packages and their reflist buckets...
25+
existingPackageRefs := deb.NewSplitRefList()
26+
existingBuckets := deb.NewRefListDigestSet()
27+
28+
reflistMigration := collectionFactory.RefListCollection().NewMigration()
2529

2630
out.Printf("Loading mirrors, local repos, snapshots and published repos...")
2731
err = collectionFactory.RemoteRepoCollection().ForEach(func(repo *deb.RemoteRepo) error {
28-
e := collectionFactory.RemoteRepoCollection().LoadComplete(repo)
29-
if e != nil {
32+
sl := deb.NewSplitRefList()
33+
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, repo.RefKey(), reflistMigration)
34+
if e != nil && e != database.ErrNotFound {
3035
return e
3136
}
32-
if repo.RefList() != nil {
33-
existingPackageRefs = existingPackageRefs.Merge(repo.RefList(), false, true)
34-
}
37+
38+
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
39+
existingBuckets.AddAllInRefList(sl)
3540

3641
return nil
3742
})
@@ -40,14 +45,14 @@ func apiDbCleanup(c *gin.Context) {
4045
}
4146

4247
err = collectionFactory.LocalRepoCollection().ForEach(func(repo *deb.LocalRepo) error {
43-
e := collectionFactory.LocalRepoCollection().LoadComplete(repo)
44-
if e != nil {
48+
sl := deb.NewSplitRefList()
49+
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, repo.RefKey(), reflistMigration)
50+
if e != nil && e != database.ErrNotFound {
4551
return e
4652
}
4753

48-
if repo.RefList() != nil {
49-
existingPackageRefs = existingPackageRefs.Merge(repo.RefList(), false, true)
50-
}
54+
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
55+
existingBuckets.AddAllInRefList(sl)
5156

5257
return nil
5358
})
@@ -56,12 +61,14 @@ func apiDbCleanup(c *gin.Context) {
5661
}
5762

5863
err = collectionFactory.SnapshotCollection().ForEach(func(snapshot *deb.Snapshot) error {
59-
e := collectionFactory.SnapshotCollection().LoadComplete(snapshot)
64+
sl := deb.NewSplitRefList()
65+
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, snapshot.RefKey(), reflistMigration)
6066
if e != nil {
6167
return e
6268
}
6369

64-
existingPackageRefs = existingPackageRefs.Merge(snapshot.RefList(), false, true)
70+
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
71+
existingBuckets.AddAllInRefList(sl)
6572

6673
return nil
6774
})
@@ -73,25 +80,37 @@ func apiDbCleanup(c *gin.Context) {
7380
if published.SourceKind != deb.SourceLocalRepo {
7481
return nil
7582
}
76-
e := collectionFactory.PublishedRepoCollection().LoadComplete(published, collectionFactory)
77-
if e != nil {
78-
return e
79-
}
8083

8184
for _, component := range published.Components() {
82-
existingPackageRefs = existingPackageRefs.Merge(published.RefList(component), false, true)
85+
sl := deb.NewSplitRefList()
86+
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, published.RefKey(component), reflistMigration)
87+
if e != nil {
88+
return e
89+
}
90+
91+
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
92+
existingBuckets.AddAllInRefList(sl)
8393
}
8494
return nil
8595
})
8696
if err != nil {
8797
return nil, err
8898
}
8999

100+
err = reflistMigration.Flush()
101+
if err != nil {
102+
return nil, err
103+
}
104+
if stats := reflistMigration.Stats(); stats.Reflists > 0 {
105+
out.Printf("Split %d reflist(s) into %d bucket(s) (%d segment(s))",
106+
stats.Reflists, stats.Buckets, stats.Segments)
107+
}
108+
90109
// ... and compare it to the list of all packages
91110
out.Printf("Loading list of all packages...")
92111
allPackageRefs := collectionFactory.PackageCollection().AllPackageRefs()
93112

94-
toDelete := allPackageRefs.Subtract(existingPackageRefs)
113+
toDelete := allPackageRefs.Subtract(existingPackageRefs.Flatten())
95114

96115
// delete packages that are no longer referenced
97116
out.Printf("Deleting unreferenced packages (%d)...", toDelete.Len())
@@ -112,6 +131,28 @@ func apiDbCleanup(c *gin.Context) {
112131
}
113132
}
114133

134+
bucketsToDelete, err := collectionFactory.RefListCollection().AllBucketDigests()
135+
if err != nil {
136+
return nil, err
137+
}
138+
139+
bucketsToDelete.RemoveAll(existingBuckets)
140+
141+
out.Printf("Deleting unreferenced reflist buckets (%d)...", bucketsToDelete.Len())
142+
if bucketsToDelete.Len() > 0 {
143+
batch := db.CreateBatch()
144+
err := bucketsToDelete.ForEach(func(digest []byte) error {
145+
return collectionFactory.RefListCollection().UnsafeDropBucket(digest, batch)
146+
})
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
if err := batch.Write(); err != nil {
152+
return nil, err
153+
}
154+
}
155+
115156
// now, build a list of files that should be present in Repository (package pool)
116157
out.Printf("Building list of files referenced by packages...")
117158
referencedFiles := make([]string, 0, existingPackageRefs.Len())

api/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func countPackagesByRepos() {
102102

103103
components := repo.Components()
104104
for _, c := range components {
105-
count := float64(len(repo.RefList(c).Refs))
105+
count := float64(repo.RefList(c).Len())
106106
apiReposPackageCountGauge.WithLabelValues(fmt.Sprintf("%s", (repo.SourceNames())), repo.Distribution, c).Set(count)
107107
}
108108

api/mirror.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func apiMirrorsCreate(c *gin.Context) {
121121
return
122122
}
123123

124-
err = collection.Add(repo)
124+
err = collection.Add(repo, collectionFactory.RefListCollection())
125125
if err != nil {
126126
AbortWithJSONError(c, 500, fmt.Errorf("unable to add mirror: %s", err))
127127
return
@@ -181,7 +181,7 @@ func apiMirrorsShow(c *gin.Context) {
181181
return
182182
}
183183

184-
err = collection.LoadComplete(repo)
184+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
185185
if err != nil {
186186
AbortWithJSONError(c, 500, fmt.Errorf("unable to show: %s", err))
187187
}
@@ -201,7 +201,7 @@ func apiMirrorsPackages(c *gin.Context) {
201201
return
202202
}
203203

204-
err = collection.LoadComplete(repo)
204+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
205205
if err != nil {
206206
AbortWithJSONError(c, 500, fmt.Errorf("unable to show: %s", err))
207207
}
@@ -395,12 +395,12 @@ func apiMirrorsUpdate(c *gin.Context) {
395395
e := context.ReOpenDatabase()
396396
if e == nil {
397397
remote.MarkAsIdle()
398-
collection.Update(remote)
398+
collection.Update(remote, collectionFactory.RefListCollection())
399399
}
400400
}()
401401

402402
remote.MarkAsUpdating()
403-
err = collection.Update(remote)
403+
err = collection.Update(remote, collectionFactory.RefListCollection())
404404
if err != nil {
405405
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
406406
}
@@ -545,7 +545,7 @@ func apiMirrorsUpdate(c *gin.Context) {
545545

546546
log.Info().Msgf("%s: Finalizing download\n", b.Name)
547547
remote.FinalizeDownload(collectionFactory, out)
548-
err = collectionFactory.RemoteRepoCollection().Update(remote)
548+
err = collectionFactory.RemoteRepoCollection().Update(remote, collectionFactory.RefListCollection())
549549
if err != nil {
550550
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
551551
}

api/publish.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
140140
}
141141

142142
resources = append(resources, string(snapshot.ResourceKey()))
143-
err = snapshotCollection.LoadComplete(snapshot)
143+
err = snapshotCollection.LoadComplete(snapshot, collectionFactory.RefListCollection())
144144
if err != nil {
145145
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
146146
return
@@ -164,7 +164,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
164164
}
165165

166166
resources = append(resources, string(localRepo.Key()))
167-
err = localCollection.LoadComplete(localRepo)
167+
err = localCollection.LoadComplete(localRepo, collectionFactory.RefListCollection())
168168
if err != nil {
169169
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
170170
}
@@ -231,7 +231,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
231231
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
232232
}
233233

234-
err = collection.Add(published)
234+
err = collection.Add(published, collectionFactory.RefListCollection())
235235
if err != nil {
236236
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
237237
}
@@ -311,7 +311,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
311311
return
312312
}
313313

314-
err2 = snapshotCollection.LoadComplete(snapshot)
314+
err2 = snapshotCollection.LoadComplete(snapshot, collectionFactory.RefListCollection())
315315
if err2 != nil {
316316
AbortWithJSONError(c, 500, err2)
317317
return
@@ -346,7 +346,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
346346
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
347347
}
348348

349-
err = collection.Update(published)
349+
err = collection.Update(published, collectionFactory.RefListCollection())
350350
if err != nil {
351351
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
352352
}

api/repos.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func apiReposCreate(c *gin.Context) {
8282

8383
collectionFactory := context.NewCollectionFactory()
8484
collection := collectionFactory.LocalRepoCollection()
85-
err := collection.Add(repo)
85+
err := collection.Add(repo, collectionFactory.RefListCollection())
8686
if err != nil {
8787
AbortWithJSONError(c, 400, err)
8888
return
@@ -132,7 +132,7 @@ func apiReposEdit(c *gin.Context) {
132132
repo.DefaultComponent = *b.DefaultComponent
133133
}
134134

135-
err = collection.Update(repo)
135+
err = collection.Update(repo, collectionFactory.RefListCollection())
136136
if err != nil {
137137
AbortWithJSONError(c, 500, err)
138138
return
@@ -201,7 +201,7 @@ func apiReposPackagesShow(c *gin.Context) {
201201
return
202202
}
203203

204-
err = collection.LoadComplete(repo)
204+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
205205
if err != nil {
206206
AbortWithJSONError(c, 500, err)
207207
return
@@ -229,7 +229,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
229229
return
230230
}
231231

232-
err = collection.LoadComplete(repo)
232+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
233233
if err != nil {
234234
AbortWithJSONError(c, 500, err)
235235
return
@@ -261,9 +261,9 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
261261
}
262262
}
263263

264-
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
264+
repo.UpdateRefList(deb.NewSplitRefListFromPackageList(list))
265265

266-
err = collectionFactory.LocalRepoCollection().Update(repo)
266+
err = collectionFactory.LocalRepoCollection().Update(repo, collectionFactory.RefListCollection())
267267
if err != nil {
268268
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
269269
}
@@ -320,7 +320,7 @@ func apiReposPackageFromDir(c *gin.Context) {
320320
return
321321
}
322322

323-
err = collection.LoadComplete(repo)
323+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
324324
if err != nil {
325325
AbortWithJSONError(c, 500, err)
326326
return
@@ -369,9 +369,9 @@ func apiReposPackageFromDir(c *gin.Context) {
369369
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import package files: %s", err)
370370
}
371371

372-
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
372+
repo.UpdateRefList(deb.NewSplitRefListFromPackageList(list))
373373

374-
err = collectionFactory.LocalRepoCollection().Update(repo)
374+
err = collectionFactory.LocalRepoCollection().Update(repo, collectionFactory.RefListCollection())
375375
if err != nil {
376376
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
377377
}
@@ -489,7 +489,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
489489
_, failedFiles2, err = deb.ImportChangesFiles(
490490
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
491491
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
492-
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
492+
collectionFactory.RefListCollection(), context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
493493
failedFiles = append(failedFiles, failedFiles2...)
494494

495495
if err != nil {

0 commit comments

Comments
 (0)