Skip to content

Commit 274a813

Browse files
authored
Properly delete stale workers with no job (#37)
* Properly delete stale workers with no job * Only stop the proper worker * Allow for jobs with no payloads * Fix race condition in test * Properly shutdown node in test
1 parent 3e2089d commit 274a813

File tree

6 files changed

+139
-22
lines changed

6 files changed

+139
-22
lines changed

docker-compose/docker-compose-redis.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ services:
22
redis:
33
image: "redis"
44
container_name: "pulse-redis"
5-
command: redis-server --save 20 1 --loglevel warning --requirepass ${REDIS_PASSWORD}
5+
command: redis-server --save "" --loglevel warning --requirepass ${REDIS_PASSWORD}
66
ports:
77
- "6379:6379"

pool/node.go

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,23 @@ func (node *Node) handleWorkerMapUpdate(ctx context.Context) {
620620
if node.closing {
621621
return
622622
}
623+
// First cleanup the local workers that are no longer active.
624+
for _, worker := range node.localWorkers {
625+
if _, ok := node.workerMap.Get(worker.ID); !ok {
626+
// If it's not in the worker map, then it's not active and its jobs
627+
// have already been requeued.
628+
node.logger.Info("handleWorkerMapUpdate: removing inactive local worker", "worker", worker.ID)
629+
worker.stopAndWait(ctx)
630+
for i, w := range node.localWorkers {
631+
if worker.ID == w.ID {
632+
node.localWorkers = append(node.localWorkers[:i], node.localWorkers[i+1:]...)
633+
break
634+
}
635+
}
636+
}
637+
}
638+
639+
// Then rebalance the jobs across the remaining active workers.
623640
activeWorkers := node.activeWorkers()
624641
if len(activeWorkers) == 0 {
625642
return
@@ -727,10 +744,11 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
727744
continue
728745
}
729746
lastSeen := time.Unix(0, lsi)
730-
if time.Since(lastSeen) <= node.workerTTL {
747+
lsd := time.Since(lastSeen)
748+
if lsd <= node.workerTTL {
731749
continue
732750
}
733-
node.logger.Debug("processInactiveWorkers: removing worker", "worker", id)
751+
node.logger.Debug("processInactiveWorkers: removing worker", "worker", id, "last-seen", lsd, "ttl", node.workerTTL)
734752

735753
// Use optimistic locking to set the keep-alive timestamp to a value
736754
// in the future so that another node does not also requeue the jobs.
@@ -747,21 +765,15 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
747765

748766
keys, ok := node.jobsMap.GetValues(id)
749767
if !ok {
750-
continue // worker is already being deleted
768+
// Worker has no jobs, so delete it right away.
769+
if err := node.deleteWorker(ctx, id); err != nil {
770+
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to delete worker %q: %w", id, err), "worker", id)
771+
}
772+
continue
751773
}
752-
mustRequeue := len(keys)
753774
requeued := make(map[string]chan error)
754775
for _, key := range keys {
755-
payload, ok := node.jobPayloadsMap.Get(key)
756-
if !ok {
757-
node.logger.Error(fmt.Errorf("processInactiveWorkers: payload for job not found"), "job", key, "worker", id)
758-
// No need to keep the job around if the payload is not found.
759-
if _, _, err := node.jobsMap.RemoveValues(ctx, id, key); err != nil {
760-
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to remove job %q from jobs map: %w", key, err), "job", key, "worker", id)
761-
}
762-
mustRequeue--
763-
continue
764-
}
776+
payload, _ := node.jobPayloadsMap.Get(key) // Some jobs have no payload
765777
job := &Job{
766778
Key: key,
767779
Payload: []byte(payload),
@@ -776,10 +788,11 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
776788
requeued[job.Key] = cherr
777789
}
778790

779-
if len(requeued) != mustRequeue {
780-
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), mustRequeue), "worker", id)
791+
allRequeued := len(requeued) == len(keys)
792+
if !allRequeued {
793+
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), len(keys)), "worker", id)
781794
}
782-
go node.processRequeuedJobs(ctx, id, requeued, len(requeued) == mustRequeue)
795+
go node.processRequeuedJobs(ctx, id, requeued, allRequeued)
783796
}
784797
}
785798

@@ -866,6 +879,7 @@ func (node *Node) activeWorkers() []string {
866879

867880
// deleteWorker removes a worker from the pool deleting the worker stream.
868881
func (node *Node) deleteWorker(ctx context.Context, id string) error {
882+
node.logger.Debug("deleteWorker: deleting worker", "worker", id)
869883
if _, err := node.keepAliveMap.Delete(ctx, id); err != nil {
870884
node.logger.Error(fmt.Errorf("deleteWorker: failed to delete worker %q from keep-alive map: %w", id, err))
871885
}

pool/ticker_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ func TestNewTicker(t *testing.T) {
3131
assert.WithinDuration(t, startTime.Add(tickDuration), firstTick, time.Second, "First tick should occur after approximately one tick duration")
3232

3333
// Verify next tick time and duration
34+
ticker.lock.Lock()
3435
nextTickTime, tickerDuration := deserialize(ticker.next)
36+
ticker.lock.Unlock()
3537
assert.WithinDuration(t, startTime.Add(tickDuration), nextTickTime, time.Second, "Next tick time should be approximately one tick duration from start")
3638
assert.Equal(t, tickDuration, tickerDuration, "Ticker duration should match the specified duration")
3739

pool/worker_test.go

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool
22

33
import (
44
"context"
5+
"strconv"
56
"strings"
67
"testing"
78
"time"
@@ -32,9 +33,7 @@ func TestWorkerRequeueJobs(t *testing.T) {
3233

3334
// Emulate the worker failing by preventing it from refreshing its keepalive
3435
// This means we can't cleanup cleanly, hence "false" in CleanupRedis
35-
worker.lock.Lock()
36-
worker.stopped = true
37-
worker.lock.Unlock()
36+
worker.stopAndWait(ctx)
3837

3938
// Create a new worker to pick up the requeued job
4039
newWorker := newTestWorker(t, ctx, node)
@@ -50,4 +49,88 @@ func TestWorkerRequeueJobs(t *testing.T) {
5049
require.Eventually(t, func() bool {
5150
return len(newWorker.Jobs()) == 2
5251
}, time.Second, delay, "job was not requeued")
52+
53+
// Cleanup
54+
assert.NoError(t, node.Shutdown(ctx))
55+
}
56+
57+
func TestStaleWorkerCleanupInNode(t *testing.T) {
58+
var (
59+
ctx = ptesting.NewTestContext(t)
60+
testName = strings.Replace(t.Name(), "/", "_", -1)
61+
rdb = ptesting.NewRedisClient(t)
62+
node = newTestNode(t, ctx, rdb, testName)
63+
)
64+
defer ptesting.CleanupRedis(t, rdb, false, testName)
65+
66+
// Create one active worker
67+
activeWorker := newTestWorker(t, ctx, node)
68+
t.Log("active worker", activeWorker.ID)
69+
70+
// Create five stale workers
71+
staleWorkers := make([]*Worker, 5)
72+
for i := 0; i < 5; i++ {
73+
staleWorkers[i] = newTestWorker(t, ctx, node)
74+
staleWorkers[i].stop(ctx)
75+
// Set the last seen time to a past time
76+
_, err := node.keepAliveMap.Set(ctx, staleWorkers[i].ID, strconv.FormatInt(time.Now().Add(-2*node.workerTTL).UnixNano(), 10))
77+
assert.NoError(t, err)
78+
}
79+
80+
// Wait for the cleanup process to run
81+
time.Sleep(3 * node.workerTTL)
82+
83+
// Check if only the active worker remains
84+
workers := node.activeWorkers()
85+
assert.Len(t, workers, 1, "There should be only one worker remaining")
86+
assert.Contains(t, workers, activeWorker.ID, "The active worker should still exist")
87+
88+
// Cleanup
89+
assert.NoError(t, node.Shutdown(ctx))
90+
}
91+
92+
func TestStaleWorkerCleanupAcrossNodes(t *testing.T) {
93+
var (
94+
ctx = ptesting.NewTestContext(t)
95+
testName = strings.Replace(t.Name(), "/", "_", -1)
96+
rdb = ptesting.NewRedisClient(t)
97+
node1 = newTestNode(t, ctx, rdb, testName+"_1")
98+
node2 = newTestNode(t, ctx, rdb, testName+"_2")
99+
)
100+
defer ptesting.CleanupRedis(t, rdb, false, testName)
101+
102+
// Create one active worker on node1
103+
activeWorker := newTestWorker(t, ctx, node1)
104+
105+
// Create five stale workers on node2
106+
staleWorkers := make([]*Worker, 5)
107+
for i := 0; i < 5; i++ {
108+
staleWorkers[i] = newTestWorker(t, ctx, node2)
109+
staleWorkers[i].stop(ctx)
110+
// Set the last seen time to a past time
111+
_, err := node2.keepAliveMap.Set(ctx, staleWorkers[i].ID, strconv.FormatInt(time.Now().Add(-2*node2.workerTTL).UnixNano(), 10))
112+
assert.NoError(t, err)
113+
}
114+
115+
// Wait for the cleanup process to run
116+
time.Sleep(3 * node2.workerTTL)
117+
118+
// Check if only the active worker remains on node1
119+
workers1 := node1.activeWorkers()
120+
assert.Len(t, workers1, 1, "There should be only one worker remaining on node1")
121+
assert.Contains(t, workers1, activeWorker.ID, "The active worker should still exist on node1")
122+
123+
// Check if all workers have been removed from node2
124+
workers2 := node2.activeWorkers()
125+
assert.Len(t, workers2, 0, "There should be no workers remaining on node2")
126+
127+
// Verify that stale workers are not in the worker map of node2
128+
for _, worker := range staleWorkers {
129+
_, exists := node2.workerMap.Get(worker.ID)
130+
assert.False(t, exists, "Stale worker %s should not exist in the worker map of node2", worker.ID)
131+
}
132+
133+
// Cleanup
134+
assert.NoError(t, node1.Shutdown(ctx))
135+
assert.NoError(t, node2.Shutdown(ctx))
53136
}

scripts/stop-redis

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/usr/bin/env bash
2+
set -e
3+
4+
# Get the root directory of the git repository
5+
GIT_ROOT=$(git rev-parse --show-toplevel)
6+
7+
# Change to the git root directory
8+
pushd "${GIT_ROOT}"
9+
10+
# Source common utilities and environment variables
11+
# shellcheck source=utils/common.sh
12+
source ./scripts/utils/common.sh
13+
source .env
14+
15+
# Stop the Redis Docker container
16+
docker compose -p redis -f docker-compose/docker-compose-redis.yaml down
17+
18+
# Return to the original directory
19+
popd

streaming/sink.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,6 @@ func (s *Sink) newConsumer(ctx context.Context, stream *Stream) (string, error)
334334
}
335335
return "", fmt.Errorf("failed to set sink keep-alive for new consumer %s: %w", consumer, err)
336336
}
337-
s.logger.Debug("created new consumer", "consumer", consumer)
338337
return consumer, nil
339338
}
340339

0 commit comments

Comments
 (0)