Skip to content

Commit 3e2089d

Browse files
authored
Don't retry requeuing jobs with missing payloads (#36)
1 parent c9bf7ad commit 3e2089d

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

pool/node.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -749,11 +749,17 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
749749
if !ok {
750750
continue // worker is already being deleted
751751
}
752+
mustRequeue := len(keys)
752753
requeued := make(map[string]chan error)
753754
for _, key := range keys {
754755
payload, ok := node.jobPayloadsMap.Get(key)
755756
if !ok {
756757
node.logger.Error(fmt.Errorf("processInactiveWorkers: payload for job not found"), "job", key, "worker", id)
758+
// No need to keep the job around if the payload is not found.
759+
if _, _, err := node.jobsMap.RemoveValues(ctx, id, key); err != nil {
760+
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to remove job %q from jobs map: %w", key, err), "job", key, "worker", id)
761+
}
762+
mustRequeue--
757763
continue
758764
}
759765
job := &Job{
@@ -770,16 +776,15 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
770776
requeued[job.Key] = cherr
771777
}
772778

773-
if len(requeued) != len(keys) {
774-
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), len(keys)), "worker", id)
775-
continue
779+
if len(requeued) != mustRequeue {
780+
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), mustRequeue), "worker", id)
776781
}
777-
go node.processRequeuedJobs(ctx, id, requeued)
782+
go node.processRequeuedJobs(ctx, id, requeued, len(requeued) == mustRequeue)
778783
}
779784
}
780785

781786
// processRequeuedJobs processes the requeued jobs concurrently.
782-
func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued map[string]chan error) {
787+
func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued map[string]chan error, deleteWorker bool) {
783788
var wg sync.WaitGroup
784789
var succeeded int64
785790
for key, cherr := range requeued {
@@ -806,8 +811,10 @@ func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued m
806811
}
807812

808813
node.logger.Info("requeued worker jobs", "worker", id, "requeued", len(requeued))
809-
if err := node.deleteWorker(ctx, id); err != nil {
810-
node.logger.Error(fmt.Errorf("processRequeuedJobs: failed to delete worker %q: %w", id, err), "worker", id)
814+
if deleteWorker {
815+
if err := node.deleteWorker(ctx, id); err != nil {
816+
node.logger.Error(fmt.Errorf("processRequeuedJobs: failed to delete worker %q: %w", id, err), "worker", id)
817+
}
811818
}
812819
}
813820

pool/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func (w *Worker) rebalance(ctx context.Context, activeWorkers []string) {
380380
delete(rebalanced, key)
381381
cherrs[key] = cherr
382382
}
383-
go w.Node.processRequeuedJobs(ctx, w.ID, cherrs)
383+
go w.Node.processRequeuedJobs(ctx, w.ID, cherrs, false)
384384
}
385385

386386
// requeueJobs requeues the jobs handled by the worker.

0 commit comments

Comments
 (0)