Skip to content

Commit a2ebdc3

Browse files
authored
pool: prevent job loss during concurrent worker cleanup (#61)
* pool: prevent job loss during concurrent worker cleanup Fixes a race where job payloads could be deleted while another node is trying to requeue jobs during cascading worker shutdown/cleanup. Adds a regression test (TestJobLossDuringConcurrentWorkerCleanup) to reproduce the original issue. * examples(weather): fix unmarshal short-read handling Use io.ReadFull so truncated/corrupted event payloads reliably return an error (tests expect ErrUnexpectedEOF).
1 parent 2fa7863 commit a2ebdc3

File tree

6 files changed

+390
-90
lines changed

6 files changed

+390
-90
lines changed

examples/weather/services/poller/events.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package poller
33
import (
44
"bytes"
55
"encoding/binary"
6+
"io"
67

78
genpoller "goa.design/pulse/examples/weather/services/poller/gen/poller"
89
)
@@ -37,17 +38,15 @@ func unmarshalLocation(data []byte) (*genpoller.Location, error) {
3738
return nil, err
3839
}
3940
stateBytes := make([]byte, stateLen)
40-
_, err = buf.Read(stateBytes)
41-
if err != nil {
41+
if _, err = io.ReadFull(buf, stateBytes); err != nil {
4242
return nil, err
4343
}
4444
err = binary.Read(buf, binary.LittleEndian, &cityLen)
4545
if err != nil {
4646
return nil, err
4747
}
4848
cityBytes := make([]byte, cityLen)
49-
_, err = buf.Read(cityBytes)
50-
if err != nil {
49+
if _, err = io.ReadFull(buf, cityBytes); err != nil {
5150
return nil, err
5251
}
5352
var lat, long float64
@@ -92,8 +91,7 @@ func unmarshalForecastEvent(data []byte) (*genpoller.Forecast, error) {
9291
return nil, err
9392
}
9493
locBytes := make([]byte, locLen)
95-
_, err = buf.Read(locBytes)
96-
if err != nil {
94+
if _, err = io.ReadFull(buf, locBytes); err != nil {
9795
return nil, err
9896
}
9997
loc, err := unmarshalLocation(locBytes)
@@ -112,8 +110,7 @@ func unmarshalForecastEvent(data []byte) (*genpoller.Forecast, error) {
112110
return nil, err
113111
}
114112
periodBytes := make([]byte, periodLen)
115-
_, err = buf.Read(periodBytes)
116-
if err != nil {
113+
if _, err = io.ReadFull(buf, periodBytes); err != nil {
117114
return nil, err
118115
}
119116
period, err := unmarshalPeriod(periodBytes)
@@ -153,7 +150,7 @@ func unmarshalPeriod(data []byte) (*genpoller.Period, error) {
153150
return nil, err
154151
}
155152
nameBytes := make([]byte, nameLen)
156-
if _, err := buf.Read(nameBytes); err != nil {
153+
if _, err := io.ReadFull(buf, nameBytes); err != nil {
157154
return nil, err
158155
}
159156

@@ -162,7 +159,7 @@ func unmarshalPeriod(data []byte) (*genpoller.Period, error) {
162159
return nil, err
163160
}
164161
startTimeBytes := make([]byte, startTimeLen)
165-
if _, err := buf.Read(startTimeBytes); err != nil {
162+
if _, err := io.ReadFull(buf, startTimeBytes); err != nil {
166163
return nil, err
167164
}
168165

@@ -171,7 +168,7 @@ func unmarshalPeriod(data []byte) (*genpoller.Period, error) {
171168
return nil, err
172169
}
173170
endTimeBytes := make([]byte, endTimeLen)
174-
if _, err := buf.Read(endTimeBytes); err != nil {
171+
if _, err := io.ReadFull(buf, endTimeBytes); err != nil {
175172
return nil, err
176173
}
177174

@@ -185,7 +182,7 @@ func unmarshalPeriod(data []byte) (*genpoller.Period, error) {
185182
return nil, err
186183
}
187184
tempUnitBytes := make([]byte, tempUnitLen)
188-
if _, err := buf.Read(tempUnitBytes); err != nil {
185+
if _, err := io.ReadFull(buf, tempUnitBytes); err != nil {
189186
return nil, err
190187
}
191188

@@ -194,7 +191,7 @@ func unmarshalPeriod(data []byte) (*genpoller.Period, error) {
194191
return nil, err
195192
}
196193
summaryBytes := make([]byte, summaryLen)
197-
if _, err := buf.Read(summaryBytes); err != nil {
194+
if _, err := io.ReadFull(buf, summaryBytes); err != nil {
198195
return nil, err
199196
}
200197

examples/weather/services/poller/marshal.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package poller
33
import (
44
"bytes"
55
"encoding/binary"
6+
"io"
67
)
78

89
// marshalCityAndState marshals a city and state into a byte slice using binary encoding.
@@ -26,7 +27,7 @@ func unmarshalCityAndState(data []byte) (city, state string, err error) {
2627
return
2728
}
2829
stateBytes := make([]byte, stateLen)
29-
_, err = buf.Read(stateBytes)
30+
_, err = io.ReadFull(buf, stateBytes)
3031
if err != nil {
3132
return
3233
}
@@ -35,7 +36,7 @@ func unmarshalCityAndState(data []byte) (city, state string, err error) {
3536
return
3637
}
3738
cityBytes := make([]byte, cityLen)
38-
_, err = buf.Read(cityBytes)
39+
_, err = io.ReadFull(buf, cityBytes)
3940
if err != nil {
4041
return
4142
}

pool/node.go

Lines changed: 113 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type (
5858
nodeStreams sync.Map // streams for worker acks indexed by ID
5959
pendingJobChannels sync.Map // channels used to send DispatchJob results, nil if event is requeued
6060
pendingEvents sync.Map // pending events indexed by sender and event IDs
61+
orphanedPayloads sync.Map // job key -> first time observed orphaned payload (unix nanos)
6162

6263
lock sync.RWMutex
6364
closing bool
@@ -355,7 +356,10 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e
355356
}
356357

357358
func (node *Node) dispatchJob(ctx context.Context, key string, job []byte, requeue bool) error {
358-
if node.IsClosed() {
359+
// Allow internal requeue operations to proceed while the node is closing.
360+
// External callers use DispatchJob which passes requeue=false and should be
361+
// rejected once Close begins.
362+
if node.IsClosed() && !requeue {
359363
return fmt.Errorf("DispatchJob: pool %q is closed", node.PoolName)
360364
}
361365

@@ -553,16 +557,19 @@ func (node *Node) close(ctx context.Context, shutdown bool) error {
553557
node.stopAllJobs(ctx)
554558
}
555559

556-
// Stop all workers before waiting for goroutines
560+
// Stop all workers before waiting for goroutines.
561+
//
562+
// IMPORTANT: do NOT remove workers from the replicated maps here.
563+
// Removing the worker deletes the worker->jobs mapping which is what other
564+
// nodes use to recover/requeue jobs if this node dies mid-close. We only
565+
// remove workers from maps after we've attempted to requeue.
557566
var wg sync.WaitGroup
558567
node.localWorkers.Range(func(key, value any) bool {
559568
worker := value.(*Worker)
560569
wg.Add(1)
561570
pulse.Go(node.logger, func() {
562571
defer wg.Done()
563572
worker.stop(ctx)
564-
// Remove worker immediately to avoid job requeuing by other nodes
565-
node.removeWorker(ctx, worker.ID)
566573
})
567574
return true
568575
})
@@ -572,13 +579,24 @@ func (node *Node) close(ctx context.Context, shutdown bool) error {
572579
close(node.stop)
573580
node.wg.Wait()
574581

575-
// Requeue jobs if not shutting down, after stopping goroutines to avoid receiving new jobs
582+
// Requeue jobs if not shutting down.
583+
//
584+
// This is done after stopping node goroutines so we don't route any new pool
585+
// events to workers that have already been stopped.
576586
if !shutdown {
577587
if err := node.requeueAllJobs(ctx); err != nil {
578588
node.logger.Error(fmt.Errorf("close: failed to requeue jobs: %w", err))
579589
}
580590
}
581591

592+
// Now that we attempted requeue, remove all local workers from pool maps.
593+
node.localWorkers.Range(func(key, value any) bool {
594+
worker := value.(*Worker)
595+
node.removeWorker(ctx, worker.ID)
596+
node.localWorkers.Delete(key)
597+
return true
598+
})
599+
582600
// Cleanup resources
583601
node.cleanupNode(ctx)
584602

@@ -959,6 +977,65 @@ func (node *Node) cleanupInactiveWorkers(ctx context.Context) {
959977
node.logger.Info("cleanupInactiveWorkers: found inactive worker", "worker", workerID)
960978
node.cleanupWorker(ctx, workerID)
961979
}
980+
981+
// Also recover any jobs that still have payloads but are missing from the job map.
982+
// This can happen transiently during cascading failures and is preferable to leaving
983+
// jobs "stuck" (payload exists, but no worker owns the job).
984+
node.requeueOrphanedPayloads(ctx)
985+
}
986+
987+
// requeueOrphanedPayloads detects payloads for job keys that are not present in
988+
// the job map and requeues them after a short grace period.
989+
func (node *Node) requeueOrphanedPayloads(ctx context.Context) {
990+
// Build a set of all job keys referenced by the job map.
991+
existingJobs := make(map[string]struct{})
992+
for _, jobs := range node.jobMap.Map() {
993+
for _, key := range strings.Split(jobs, ",") {
994+
if key == "" {
995+
continue
996+
}
997+
existingJobs[key] = struct{}{}
998+
}
999+
}
1000+
1001+
// Use a short grace period: we want recovery to be fast under churn,
1002+
// but still avoid requeuing during brief map inconsistencies.
1003+
grace := 2 * node.workerTTL
1004+
if grace < node.ackGracePeriod {
1005+
grace = node.ackGracePeriod
1006+
}
1007+
1008+
now := time.Now()
1009+
for key := range node.jobPayloadMap.Map() {
1010+
if _, ok := existingJobs[key]; ok {
1011+
node.orphanedPayloads.Delete(key)
1012+
continue
1013+
}
1014+
1015+
firstAny, ok := node.orphanedPayloads.Load(key)
1016+
if !ok {
1017+
node.orphanedPayloads.Store(key, now.UnixNano())
1018+
continue
1019+
}
1020+
firstNS, _ := firstAny.(int64)
1021+
if firstNS == 0 || now.Sub(time.Unix(0, firstNS)) < grace {
1022+
continue
1023+
}
1024+
1025+
payload, ok := node.JobPayload(key)
1026+
if !ok {
1027+
node.orphanedPayloads.Delete(key)
1028+
continue
1029+
}
1030+
job := &Job{Key: key, Payload: payload, CreatedAt: now, NodeID: node.ID}
1031+
if _, err := node.poolStream.Add(ctx, evStartJob, marshalJob(job)); err != nil {
1032+
node.logger.Error(fmt.Errorf("requeueOrphanedPayloads: failed to requeue orphaned job: %w", err), "key", key)
1033+
continue
1034+
}
1035+
1036+
node.orphanedPayloads.Delete(key)
1037+
node.logger.Info("requeueOrphanedPayloads: requeued orphaned job", "key", key, "grace", grace)
1038+
}
9621039
}
9631040

9641041
// cleanupWorker requeues the jobs assigned to the worker and deletes it from
@@ -981,23 +1058,38 @@ func (node *Node) cleanupWorker(ctx context.Context, workerID string) {
9811058
}
9821059

9831060
// Requeue jobs and process them
984-
var requeued int
1061+
var (
1062+
requeued int // jobs successfully requeued
1063+
processed int // jobs that were either requeued or cleaned up as stale
1064+
)
9851065
for _, key := range keys {
9861066
payload, ok := node.JobPayload(key)
9871067
if !ok {
988-
node.logger.Error(fmt.Errorf("requeueWorkerJobs: failed to get job payload"), "job", key, "worker", workerID)
989-
requeued++ // We will never be able to requeue this job
1068+
// The job key can remain in the jobs map even if the payload has already
1069+
// been removed (e.g. the job was stopped, or another node already handled
1070+
// the requeue). Treat it as a stale entry and remove it so future cleanup
1071+
// attempts don't keep looping on it.
1072+
if _, _, err := node.jobMap.RemoveValues(ctx, workerID, key); err != nil {
1073+
node.logger.Error(fmt.Errorf("cleanupWorker: failed to remove stale job from jobs map: %w", err), "job", key, "worker", workerID)
1074+
continue
1075+
}
1076+
node.logger.Info("cleanupWorker: removed stale job key with missing payload", "job", key, "worker", workerID)
1077+
processed++
9901078
continue
9911079
}
992-
job := &Job{Key: key, Payload: []byte(payload), CreatedAt: time.Now(), NodeID: node.ID}
993-
if err := node.dispatchJob(ctx, job.Key, marshalJob(job), true); err != nil {
1080+
job := &Job{Key: key, Payload: payload, CreatedAt: time.Now(), NodeID: node.ID}
1081+
// Requeue by adding an event back to the pool stream.
1082+
// We intentionally do not wait for the job to start (which can time out
1083+
// under heavy churn) - the pool sink will retry routing until it is acked.
1084+
if _, err := node.poolStream.Add(ctx, evStartJob, marshalJob(job)); err != nil {
9941085
node.logger.Error(fmt.Errorf("requeueWorkerJobs: failed to requeue job: %w", err), "job", job.Key, "worker", workerID)
9951086
continue
9961087
}
9971088
requeued++
1089+
processed++
9981090
}
999-
if len(keys) != requeued {
1000-
node.logger.Info("partially requeued stale worker jobs", "requeued", requeued, "jobs", len(keys), "worker", workerID)
1091+
if len(keys) != processed {
1092+
node.logger.Info("partially processed stale worker jobs", "requeued", requeued, "processed", processed, "jobs", len(keys), "worker", workerID)
10011093
return
10021094
}
10031095

@@ -1014,21 +1106,12 @@ func (node *Node) processInactiveJobs(ctx context.Context) {
10141106
ticker := time.NewTicker(node.ackGracePeriod) // Run at ackGracePeriod frequency since pending jobs expire after 2*ackGracePeriod
10151107
defer ticker.Stop()
10161108

1017-
payloadCleanupTicker, err := node.NewTicker(ctx, "jobPayloadCleanup", node.workerTTL)
1018-
if err != nil {
1019-
node.logger.Error(fmt.Errorf("processInactiveJobs: failed to create payload cleanup ticker: %w", err))
1020-
return
1021-
}
1022-
defer payloadCleanupTicker.Stop()
1023-
10241109
for {
10251110
select {
10261111
case <-node.stop:
10271112
return
10281113
case <-ticker.C:
10291114
node.cleanupStalePendingJobs(ctx)
1030-
case <-payloadCleanupTicker.C:
1031-
node.cleanupOrphanedJobPayloads(ctx)
10321115
}
10331116
}
10341117
}
@@ -1050,29 +1133,6 @@ func (node *Node) cleanupStalePendingJobs(ctx context.Context) {
10501133
}
10511134
}
10521135

1053-
// cleanupOrphanedJobPayloads checks for and removes entries in the job payload map
1054-
// that don't have a corresponding entry in the job map.
1055-
func (node *Node) cleanupOrphanedJobPayloads(ctx context.Context) {
1056-
// Get all existing job keys from the job map
1057-
existingJobs := make(map[string]struct{})
1058-
for _, jobs := range node.jobMap.Map() {
1059-
for _, key := range strings.Split(jobs, ",") {
1060-
existingJobs[key] = struct{}{}
1061-
}
1062-
}
1063-
1064-
// Check each payload entry
1065-
for key := range node.jobPayloadMap.Map() {
1066-
if _, exists := existingJobs[key]; !exists {
1067-
if _, err := node.jobPayloadMap.Delete(ctx, key); err != nil {
1068-
node.logger.Error(fmt.Errorf("cleanupOrphanedJobPayloads: failed to delete orphaned payload for job %q: %w", key, err))
1069-
continue
1070-
}
1071-
node.logger.Info("cleanupOrphanedJobPayloads: removed orphaned payload", "key", key)
1072-
}
1073-
}
1074-
}
1075-
10761136
// acquireCleanupLock tries to acquire the cleanup lock for a worker.
10771137
// It returns true if the lock was acquired, false if another node holds the lock.
10781138
// It will clear any stale or invalid locks it finds.
@@ -1226,12 +1286,15 @@ func (node *Node) removeWorkerFromMaps(ctx context.Context, id string) {
12261286
if _, err := node.workerCleanupMap.Delete(ctx, id); err != nil {
12271287
node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove cleanup timestamp: %w", err), "worker", id)
12281288
}
1229-
jobKeys, _ := node.jobMap.GetValues(id)
1230-
for _, key := range jobKeys {
1231-
if _, err := node.jobPayloadMap.Delete(ctx, key); err != nil {
1232-
node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove job %s from payload map: %w", key, err))
1233-
}
1234-
}
1289+
// NOTE: Do not delete job payloads here.
1290+
//
1291+
// Payload entries are job-scoped (not worker-scoped) and are required to
1292+
// safely requeue jobs from a stale worker during distributed cleanup. Deleting
1293+
// payloads during worker removal can race with another node performing
1294+
// cleanup/requeue and lead to permanent job loss.
1295+
//
1296+
// Payloads are deleted when jobs stop (see Worker.stopJob) and any remaining
1297+
// orphaned payloads are eventually collected by cleanupOrphanedJobPayloads.
12351298
if _, err := node.jobMap.Delete(ctx, id); err != nil {
12361299
node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove worker %s from jobs map: %w", id, err))
12371300
}

0 commit comments

Comments
 (0)