From 2827620cfe25a8278e4f6e0d269bbfc80cb1e93a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Wed, 20 May 2026 13:40:37 +0000 Subject: [PATCH 1/8] fix(publish): pre-register published repo key before task submission MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit apiPublishRepoOrSnapshot appended published.Key() to resources inside the task closure, after maybeRunTaskInBackground had already been called. The task's locked-resource set is fixed at submission time, so that append had no effect — the published repo key was never registered as a resource. Two concurrent POST /api/publish/{prefix} requests for the same prefix/distribution therefore did not conflict in the task queue: both ran in parallel, each loaded an empty PublishedRepoCollection from the DB, both passed CheckDuplicate, and the second Add silently overwrote the first. Fix: compute the published repo key ("U{storagePrefix}>>{distribution}") from the already-known storage/prefix/distribution values and append it to resources before calling maybeRunTaskInBackground, so concurrent creates for the same destination are serialised by the task queue. The now-dead append inside the closure is removed. --- api/publish.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/api/publish.go b/api/publish.go index 67b260d47..b0cedfc08 100644 --- a/api/publish.go +++ b/api/publish.go @@ -300,6 +300,17 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { collection := collectionFactory.PublishedRepoCollection() + // Pre-register the published repo key in resources so that concurrent + // POST requests for the same prefix/distribution are serialized by the + // task queue rather than racing on CheckDuplicate + Add. + if b.Distribution != "" { + storagePrefix := prefix + if storage != "" { + storagePrefix = storage + ":" + prefix + } + resources = append(resources, "U"+storagePrefix+">>"+b.Distribution) + } + taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { @@ -332,8 +343,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - resources = append(resources, string(published.Key())) - if b.Origin != "" { published.Origin = b.Origin } From 2a5992c74eb6f396335fb62f57dde9376825f364 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Wed, 20 May 2026 13:41:58 +0000 Subject: [PATCH 2/8] fix(publish): reload published inside task for source-management endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Affected endpoints: apiPublishAddSource, apiPublishSetSources, apiPublishUpdateSource, apiPublishRemoveSource, apiPublishDropChanges. All five handlers shared the same flawed pattern: they loaded the published repo from the DB and mutated it (ObtainRevision / DropRevision) outside the task closure, before the task lock was acquired. Each task closure then just wrote back the already-mutated, pre-lock object. Because the task queue serialises tasks that share a resource key, two concurrent requests appear safe — but each task closure holds a stale copy of the object captured before the lock was taken: Request A loads published: revision = {} Request B loads published: revision = {} <- same DB state A mutates: revision = {main: snap1} B mutates: revision = {contrib: snap2} Task A runs: saves {main: snap1} OK Task B runs: saves {contrib: snap2} <- clobbers A's change Fix: perform only a shallow ByStoragePrefixDistribution outside the task (for the early 404 response, resource key, and task name). Inside the task closure a dedicated taskCollectionFactory is created, the published repo is re-read fresh from the DB (after the lock is acquired), and LoadComplete + all mutations + Update are executed against that authoritative copy. --- api/publish.go | 234 +++++++++++++++++++++++++++++-------------------- 1 file changed, 138 insertions(+), 96 deletions(-) diff --git a/api/publish.go b/api/publish.go index b0cedfc08..73e049fc9 100644 --- a/api/publish.go +++ b/api/publish.go @@ -648,43 +648,52 @@ func apiPublishAddSource(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly (no LoadComplete) to verify existence and obtain the + // resource key and task name. The actual mutation is performed inside + // the task on a freshly loaded copy to prevent lost-update races. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if c.Bind(&b) != nil { - return - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err) + } - revision := published.ObtainRevision() - sources := revision.Sources + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err) + } - component := b.Component - name := b.Name + revision := published.ObtainRevision() + sources := revision.Sources - _, exists := sources[component] - if exists { - AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component)) - return - } + component := b.Component + name := b.Name - sources[component] = name + _, exists := sources[component] + if exists { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component) + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + sources[component] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -766,39 +775,48 @@ func apiPublishSetSources(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if c.Bind(&b) != nil { - return - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - revision := published.ObtainRevision() - sources := make(map[string]string, len(b)) - revision.Sources = sources + revision := published.ObtainRevision() + sources := make(map[string]string, len(b)) + revision.Sources = sources - for _, source := range b { - component := source.Component - name := source.Name - sources[component] = name - } + for _, source := range b { + component := source.Component + name := source.Name + sources[component] = name + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -831,24 +849,33 @@ func apiPublishDropChanges(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and DropRevision happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - published.DropRevision() - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + published.DropRevision() + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -884,51 +911,58 @@ func apiPublishUpdateSource(c *gin.Context) { param := slashEscape(c.Params.ByName("prefix")) storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) - component := slashEscape(c.Params.ByName("component")) + urlComponent := slashEscape(c.Params.ByName("component")) + + // Default component to the URL path segment; the body may rename it. + b.Component = urlComponent + if c.Bind(&b) != nil { + return + } collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - revision := published.ObtainRevision() - sources := revision.Sources + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component)) - return - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - b.Component = component - b.Name = revision.Sources[component] + revision := published.ObtainRevision() + sources := revision.Sources - if c.Bind(&b) != nil { - return - } + _, exists := sources[urlComponent] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent) + } - if b.Component != component { - delete(sources, component) - } + if b.Component != urlComponent { + delete(sources, urlComponent) + } - component = b.Component - name := b.Name - sources[component] = name + newComponent := b.Component + name := b.Name + sources[newComponent] = name - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -965,33 +999,41 @@ func apiPublishRemoveSource(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - revision := published.ObtainRevision() - sources := revision.Sources + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component)) - return - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources - delete(sources, component) + _, exists := sources[component] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component) + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + delete(sources, component) + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } From b7969c7a2d18dc8e41fb9945460440f1e32bda27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Wed, 20 May 2026 13:42:10 +0000 Subject: [PATCH 3/8] fix(publish): reload published inside task for update/switch endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Affected endpoints: apiPublishUpdateSwitch (PUT), apiPublishUpdate (POST). Both handlers loaded the published repo and mutated scalar fields (Label, Origin, SkipContents, SkipBz2, AcquireByHash, SignedBy, MultiDist, Version) outside the task closure, before the lock was acquired. Inside the task, LoadComplete only refreshed sourceItems — it did not reload scalar fields or the Revision. Two concurrent requests therefore each operated on a stale base: Request A loads published (Label="old"), sets Label="A" Request B loads published (Label="old"), sets Label="B" Task A runs: Update() + Publish() + collection.Update() -> saves Label="A" Task B runs: Update() on B's stale copy -> saves Label="B", silently discarding A's Label change and potentially reconciling a Revision built against the pre-A state. Fix: remove all field mutations and the LoadComplete call from the HTTP handler. Inside the task, a fresh taskCollectionFactory is created, the published repo is re-read via ByStoragePrefixDistribution + LoadComplete (obtaining the current DB state after the lock is held), and then all field mutations are applied before Update / Publish / collection.Update. --- api/publish.go | 163 ++++++++++++++++++++++++++----------------------- 1 file changed, 85 insertions(+), 78 deletions(-) diff --git a/api/publish.go b/api/publish.go index 73e049fc9..4e4b75f84 100644 --- a/api/publish.go +++ b/api/publish.go @@ -492,46 +492,50 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - + // Field mutations and fresh DB load are deferred to inside the task so + // they always operate on a consistent state after the lock is held. resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(published, collectionFactory) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + revision := published.ObtainRevision() sources := revision.Sources @@ -543,17 +547,17 @@ func apiPublishUpdateSwitch(c *gin.Context) { } } - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -561,7 +565,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -1105,64 +1109,67 @@ func apiPublishUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and field mutations happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if b.Label != nil { - published.Label = *b.Label - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - if b.Origin != nil { - published.Origin = *b.Origin - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - if b.Version != nil { - published.Version = *b.Version - } + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1170,7 +1177,7 @@ func apiPublishUpdate(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } From 9e91ee4c4af06ffd681f72279edbe04a7700176c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Sun, 24 May 2026 19:46:09 +0000 Subject: [PATCH 4/8] fix(publish): reload published inside task for create/drop endpoints Affected endpoints: apiPublishRepoOrSnapshot (POST /api/publish/{prefix}), apiPublishDrop (DELETE /api/publish/{prefix}/{distribution}). Both handlers used the outer-scoped collectionFactory and collection variables inside the task closure. These were captured before the task lock was acquired, so under concurrent load each task operated on a stale DB view: apiPublishRepoOrSnapshot: snapshot/localRepo LoadComplete, NewPublishedRepo, CheckDuplicate, Publish, and collection.Add all used the pre-lock collectionFactory/collection. Two concurrent POST to same prefix could both pass CheckDuplicate (neither sees the other in the stale DB view) and race on disk writes. apiPublishDrop: collection.Remove used pre-lock collection, potentially racing with concurrent updates/other drops. Fix: inside the task closure create a fresh taskCollectionFactory and taskCollection. All DB reads (LoadComplete) and writes (CheckDuplicate, Add, Remove, Publish) now run against the authoritative DB state after the lock is held. --- api/publish.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/api/publish.go b/api/publish.go index 4e4b75f84..29043785a 100644 --- a/api/publish.go +++ b/api/publish.go @@ -298,8 +298,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { multiDist = *b.MultiDist } - collection := collectionFactory.PublishedRepoCollection() - // Pre-register the published repo key in resources so that concurrent // POST requests for the same prefix/distribution are serialized by the // task queue rather than racing on CheckDuplicate + Add. @@ -314,6 +312,9 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + taskDetail := task.PublishDetail{ Detail: detail, } @@ -325,10 +326,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { for _, source := range sources { switch s := source.(type) { case *deb.Snapshot: - snapshotCollection := collectionFactory.SnapshotCollection() + snapshotCollection := taskCollectionFactory.SnapshotCollection() err = snapshotCollection.LoadComplete(s) case *deb.LocalRepo: - localCollection := collectionFactory.LocalRepoCollection() + localCollection := taskCollectionFactory.LocalRepoCollection() err = localCollection.LoadComplete(s) default: err = fmt.Errorf("unexpected type for source: %T", source) @@ -338,7 +339,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { } } - published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist) + published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } @@ -376,18 +377,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { published.Version = b.Version } - duplicate := collection.CheckDuplicate(published) + duplicate := taskCollection.CheckDuplicate(published) if duplicate != nil { - _ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) + _ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - err = collection.Add(published) + err = taskCollection.Add(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -615,8 +616,11 @@ func apiPublishDrop(c *gin.Context) { resources := []string{string(published.Key())} taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.Remove(context, storage, prefix, distribution, - collectionFactory, out, force, skipCleanup) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + err := taskCollection.Remove(context, storage, prefix, distribution, + taskCollectionFactory, out, force, skipCleanup) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) } From 9ecbc844e7eadfcd213399f79841200cc068a4fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Sun, 24 May 2026 20:09:13 +0000 Subject: [PATCH 5/8] fix(publish): warn when distribution missing and resource key cannot be pre-registered When b.Distribution is empty, the pre-registered resource key U:>> cannot be constructed, so concurrent POST requests to the same prefix are not serialized by the task queue. Add a log warning so operators are aware of the gap. --- api/publish.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/api/publish.go b/api/publish.go index 29043785a..8ba32fd97 100644 --- a/api/publish.go +++ b/api/publish.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "log" "net/http" "strings" @@ -307,6 +308,8 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { storagePrefix = storage + ":" + prefix } resources = append(resources, "U"+storagePrefix+">>"+b.Distribution) + } else { + log.Printf("distribution not specified for publish to prefix '%s' - unable to lock ", prefix) } taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", From 8f2b335409c70b0afc9a948c7c2c35f04ca58c58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 10:15:59 +0200 Subject: [PATCH 6/8] fix(publish): lock source repos/snapshots on publish update switch Affected endpoint: apiPublishUpdateSwitch (PUT /api/publish/{prefix}/{distribution}). The handler registered only the published repo key as a task resource. The underlying source repos (for local) or snapshots (for snapshot-based published repos) were not locked. Concurrent updates to a source repo or snapshot while a publish-update/switch task was running could produce inconsistent published indexes: Task A: apiPublishUpdateSwitch loads published, reads source repo/snapshot Request B: modifies same source repo or snapshot (add/remove packages, etc) Task A: Update() + Publish() reads stale/modified source -> inconsistent published index, or partial write if source deleted mid-task. Fix: for SourceLocalRepo, iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append string(repo.Key()) to resources. For SourceSnapshot, iterate b.Snapshots, look up each snapshot via snapshotCollection.ByName and append string(snapshot.ResourceKey()) to resources. Task queue now serialises against both the published repo and all its sources. --- api/publish.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/api/publish.go b/api/publish.go index 8ba32fd97..6a1a23d2c 100644 --- a/api/publish.go +++ b/api/publish.go @@ -471,6 +471,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { @@ -478,18 +479,29 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } + resources := []string{string(published.Key())} + if published.SourceKind == deb.SourceLocalRepo { if len(b.Snapshots) > 0 { AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) return } + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(repo.Key())) + } } else if published.SourceKind == deb.SourceSnapshot { for _, snapshotInfo := range b.Snapshots { - _, err2 := snapshotCollection.ByName(snapshotInfo.Name) + snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name) if err2 != nil { AbortWithJSONError(c, http.StatusNotFound, err2) return } + resources = append(resources, string(snapshot.ResourceKey())) } } else { AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) @@ -498,7 +510,6 @@ func apiPublishUpdateSwitch(c *gin.Context) { // Field mutations and fresh DB load are deferred to inside the task so // they always operate on a consistent state after the lock is held. - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { taskCollectionFactory := context.NewCollectionFactory() From d44ae522ac98f8368c4a60f3f5402a9df77f0b78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 09:47:22 +0000 Subject: [PATCH 7/8] fix(publish): lock source repos/snapshots on publish update endpoint --- api/publish.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/api/publish.go b/api/publish.go index 6a1a23d2c..2a6b0c0ba 100644 --- a/api/publish.go +++ b/api/publish.go @@ -1136,6 +1136,33 @@ func apiPublishUpdate(c *gin.Context) { } resources := []string{string(published.Key())} + + // Lock source repos / snapshots the same way apiPublishUpdateSwitch does, + // because published.Update() reads from them and concurrent modification + // would produce an inconsistent view. + snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() + + if published.SourceKind == deb.SourceLocalRepo { + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, uuid := range published.Sources { + snapshot, err2 := snapshotCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.ResourceKey())) + } + } + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { taskCollectionFactory := context.NewCollectionFactory() From 68814ff1f04c954208bc0ba38de70330061f3a98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 11:57:47 +0200 Subject: [PATCH 8/8] docs: fix typo --- CONTRIBUTING.md | 2 +- Makefile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d4c32b7f2..c0d51e56a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -137,7 +137,7 @@ make docker-unit-tests In order to run aptly system tests, enter the following: ``` -make docker-system-tests +make docker-system-test ``` #### Running golangci-lint diff --git a/Makefile b/Makefile index 770d9d9dc..ab3f4e3d2 100644 --- a/Makefile +++ b/Makefile @@ -241,4 +241,4 @@ clean: ## remove local build and module cache rm -f unit.out aptly.test VERSION docs/docs.go docs/swagger.json docs/swagger.yaml docs/swagger.conf find system/ -type d -name __pycache__ -exec rm -rf {} \; 2>/dev/null || true -.PHONY: help man prepare swagger version binaries build docker-release docker-system-tests docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8 +.PHONY: help man prepare swagger version binaries build docker-release docker-system-test docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8