Skip to content

Commit 2c189b7

Browse files
authored
Add more tests (#38)
1 parent 274a813 commit 2c189b7

File tree

6 files changed

+220
-31
lines changed

6 files changed

+220
-31
lines changed

pool/node.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ func (node *Node) Shutdown(ctx context.Context) error {
338338
}
339339
if node.clientOnly {
340340
node.lock.Unlock()
341-
return fmt.Errorf("Shutdown: pool %q is client-only", node.Name)
341+
return fmt.Errorf("Shutdown: client-only node cannot shutdown worker pool")
342342
}
343343
node.lock.Unlock()
344344
node.logger.Info("shutting down")
@@ -401,31 +401,32 @@ func (node *Node) close(ctx context.Context, requeue bool) error {
401401
node.logger.Info("closing")
402402
node.closing = true
403403

404-
// Need to stop workers before requeueing jobs to prevent
405-
// requeued jobs from being handled by this node.
406-
var wg sync.WaitGroup
407-
node.logger.Debug("stopping workers", "count", len(node.localWorkers))
408-
for _, w := range node.localWorkers {
409-
wg.Add(1)
410-
go func(w *Worker) {
411-
defer wg.Done()
412-
w.stopAndWait(ctx)
413-
}(w)
414-
}
415-
wg.Wait()
416-
node.logger.Debug("workers stopped")
417-
418-
for _, w := range node.localWorkers {
419-
if requeue {
420-
if err := w.requeueJobs(ctx); err != nil {
421-
node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", w.ID, err))
422-
continue
404+
if len(node.localWorkers) > 0 {
405+
// Need to stop workers before requeueing jobs to prevent
406+
// requeued jobs from being handled by this node.
407+
var wg sync.WaitGroup
408+
node.logger.Debug("stopping workers", "count", len(node.localWorkers))
409+
for _, w := range node.localWorkers {
410+
wg.Add(1)
411+
go func(w *Worker) {
412+
defer wg.Done()
413+
w.stopAndWait(ctx)
414+
}(w)
415+
}
416+
wg.Wait()
417+
node.logger.Debug("workers stopped")
418+
for _, w := range node.localWorkers {
419+
if requeue {
420+
if err := w.requeueJobs(ctx); err != nil {
421+
node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", w.ID, err))
422+
continue
423+
}
423424
}
425+
w.cleanup(ctx)
424426
}
425-
w.cleanup(ctx)
427+
node.localWorkers = nil
426428
}
427429

428-
node.localWorkers = nil
429430
if !node.clientOnly {
430431
node.poolSink.Close()
431432
node.tickerMap.Close()
@@ -532,7 +533,10 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
532533
key := workerID + ":" + ack.EventID
533534
pending, ok := node.pendingEvents[key]
534535
if !ok {
535-
node.logger.Error(fmt.Errorf("ackWorkerEvent: received event %s from worker %s that was not dispatched", ack.EventID, workerID))
536+
node.logger.Error(fmt.Errorf("ackWorkerEvent: received unknown event %s from worker %s", ack.EventID, workerID))
537+
if err := node.poolSink.Ack(ctx, pending); err != nil {
538+
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to ack unknown event: %w", err), "event", pending.EventName, "id", pending.ID)
539+
}
536540
return
537541
}
538542

pool/node_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package pool
22

33
import (
4+
"context"
5+
"fmt"
46
"strings"
57
"testing"
68
"time"
79

10+
"github.com/redis/go-redis/v9"
811
"github.com/stretchr/testify/assert"
912
"github.com/stretchr/testify/require"
1013

14+
"goa.design/pulse/streaming"
1115
ptesting "goa.design/pulse/testing"
1216
)
1317

@@ -90,6 +94,82 @@ func TestDispatchJobTwoWorkers(t *testing.T) {
9094
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
9195
}
9296

97+
func TestNotifyWorker(t *testing.T) {
98+
testName := strings.Replace(t.Name(), "/", "_", -1)
99+
ctx := ptesting.NewTestContext(t)
100+
rdb := ptesting.NewRedisClient(t)
101+
node := newTestNode(t, ctx, rdb, testName)
102+
defer ptesting.CleanupRedis(t, rdb, true, testName)
103+
104+
// Create a worker
105+
worker := newTestWorker(t, ctx, node)
106+
107+
// Set up notification handling
108+
jobKey := "test-job"
109+
jobPayload := []byte("job payload")
110+
notificationPayload := []byte("test notification")
111+
ch := make(chan []byte, 1)
112+
worker.handler.(*mockHandler).notifyFunc = func(key string, payload []byte) error {
113+
assert.Equal(t, jobKey, key, "Received notification for the wrong key")
114+
assert.Equal(t, notificationPayload, payload, "Received notification for the wrong payload")
115+
close(ch)
116+
return nil
117+
}
118+
119+
// Dispatch a job to ensure the worker is assigned
120+
require.NoError(t, node.DispatchJob(ctx, jobKey, jobPayload))
121+
122+
// Send a notification
123+
err := node.NotifyWorker(ctx, jobKey, notificationPayload)
124+
require.NoError(t, err, "Failed to send notification")
125+
126+
// Wait for the notification to be received
127+
select {
128+
case <-ch:
129+
case <-time.After(max):
130+
t.Fatal("Timeout waiting for notification to be received")
131+
}
132+
133+
// Shutdown node
134+
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
135+
}
136+
137+
func TestNotifyWorkerNoHandler(t *testing.T) {
138+
testName := strings.Replace(t.Name(), "/", "_", -1)
139+
ctx, buf := ptesting.NewBufferedLogContext(t)
140+
rdb := ptesting.NewRedisClient(t)
141+
node := newTestNode(t, ctx, rdb, testName)
142+
defer ptesting.CleanupRedis(t, rdb, true, testName)
143+
144+
// Create a worker without NotificationHandler implementation
145+
worker := newTestWorkerWithoutNotify(t, ctx, node)
146+
147+
// Dispatch a job to ensure the worker is assigned
148+
jobKey := "test-job"
149+
jobPayload := []byte("job payload")
150+
require.NoError(t, node.DispatchJob(ctx, jobKey, jobPayload))
151+
152+
// Wait for the job to be received by the worker
153+
require.Eventually(t, func() bool {
154+
return len(worker.Jobs()) == 1
155+
}, max, delay, "Job was not received by the worker")
156+
157+
// Send a notification
158+
notificationPayload := []byte("test notification")
159+
assert.NoError(t, node.NotifyWorker(ctx, jobKey, notificationPayload), "Failed to send notification")
160+
161+
// Check that an error was logged
162+
assert.Eventually(t, func() bool {
163+
return strings.Contains(buf.String(), "worker does not implement NotificationHandler, ignoring notification")
164+
}, max, delay, "Expected error message was not logged within the timeout period")
165+
166+
// Ensure the worker is still functioning
167+
assert.Len(t, worker.Jobs(), 1, "Worker should still have the job")
168+
169+
// Shutdown node
170+
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
171+
}
172+
93173
func TestRemoveWorkerThenShutdown(t *testing.T) {
94174
ctx := ptesting.NewTestContext(t)
95175
testName := strings.Replace(t.Name(), "/", "_", -1)
@@ -225,3 +305,79 @@ func TestNodeCloseAndRequeue(t *testing.T) {
225305
// Clean up
226306
require.NoError(t, node2.Shutdown(ctx), "Failed to shutdown node2")
227307
}
308+
309+
func TestStaleEventsAreRemoved(t *testing.T) {
310+
// Setup
311+
ctx := ptesting.NewTestContext(t)
312+
testName := strings.Replace(t.Name(), "/", "_", -1)
313+
rdb := ptesting.NewRedisClient(t)
314+
defer ptesting.CleanupRedis(t, rdb, true, testName)
315+
node := newTestNode(t, ctx, rdb, testName)
316+
defer func() { assert.NoError(t, node.Shutdown(ctx)) }()
317+
318+
// Add a stale event manually
319+
staleEventID := fmt.Sprintf("%d-0", time.Now().Add(-2*node.pendingJobTTL).UnixNano()/int64(time.Millisecond))
320+
staleEvent := &streaming.Event{
321+
ID: staleEventID,
322+
EventName: "test-event",
323+
Payload: []byte("test-payload"),
324+
Acker: &mockAcker{
325+
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
326+
return redis.NewIntCmd(ctx, 0)
327+
},
328+
},
329+
}
330+
node.pendingEvents["worker:stale-event-id"] = staleEvent
331+
332+
// Add a fresh event
333+
freshEventID := fmt.Sprintf("%d-0", time.Now().Add(-time.Second).UnixNano()/int64(time.Millisecond))
334+
freshEvent := &streaming.Event{
335+
ID: freshEventID,
336+
EventName: "test-event",
337+
Payload: []byte("test-payload"),
338+
Acker: &mockAcker{
339+
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
340+
return redis.NewIntCmd(ctx, 0)
341+
},
342+
},
343+
}
344+
node.pendingEvents["worker:fresh-event-id"] = freshEvent
345+
346+
// Create a mock event to trigger the ackWorkerEvent function
347+
mockEvent := &streaming.Event{
348+
ID: "mock-event-id",
349+
EventName: evAck,
350+
Payload: marshalEnvelope("worker", marshalAck(&ack{EventID: "mock-event-id"})),
351+
Acker: &mockAcker{
352+
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
353+
return redis.NewIntCmd(ctx, 0)
354+
},
355+
},
356+
}
357+
node.pendingEvents["worker:mock-event-id"] = mockEvent
358+
359+
// Call ackWorkerEvent to trigger the stale event cleanup
360+
node.ackWorkerEvent(ctx, mockEvent)
361+
362+
assert.Eventually(t, func() bool {
363+
node.lock.Lock()
364+
defer node.lock.Unlock()
365+
_, exists := node.pendingEvents["worker:stale-event-id"]
366+
return !exists
367+
}, max, delay, "Stale event should have been removed")
368+
369+
assert.Eventually(t, func() bool {
370+
node.lock.Lock()
371+
defer node.lock.Unlock()
372+
_, exists := node.pendingEvents["worker:fresh-event-id"]
373+
return exists
374+
}, max, delay, "Fresh event should still be present")
375+
}
376+
377+
type mockAcker struct {
378+
XAckFunc func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd
379+
}
380+
381+
func (m *mockAcker) XAck(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
382+
return m.XAckFunc(ctx, streamKey, sinkName, ids...)
383+
}

pool/testing.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@ import (
1414
type mockHandler struct {
1515
startFunc func(job *Job) error
1616
stopFunc func(key string) error
17-
notifyFunc func(payload []byte) error
17+
notifyFunc func(key string, payload []byte) error
18+
}
19+
20+
// mockHandlerWithoutNotify is a mock handler that doesn't implement NotificationHandler
21+
type mockHandlerWithoutNotify struct {
22+
startFunc func(job *Job) error
23+
stopFunc func(key string) error
1824
}
1925

2026
const (
@@ -47,7 +53,20 @@ func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker {
4753
handler := &mockHandler{
4854
startFunc: func(job *Job) error { return nil },
4955
stopFunc: func(key string) error { return nil },
50-
notifyFunc: func(payload []byte) error { return nil },
56+
notifyFunc: func(key string, payload []byte) error { return nil },
57+
}
58+
worker, err := node.AddWorker(ctx, handler)
59+
require.NoError(t, err)
60+
return worker
61+
}
62+
63+
// newTestWorkerWithoutNotify creates a new Worker instance for testing purposes.
64+
// It sets up a mock handler without NotificationHandler for testing.
65+
func newTestWorkerWithoutNotify(t *testing.T, ctx context.Context, node *Node) *Worker {
66+
t.Helper()
67+
handler := &mockHandlerWithoutNotify{
68+
startFunc: func(job *Job) error { return nil },
69+
stopFunc: func(key string) error { return nil },
5170
}
5271
worker, err := node.AddWorker(ctx, handler)
5372
require.NoError(t, err)
@@ -56,4 +75,9 @@ func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker {
5675

5776
func (w *mockHandler) Start(job *Job) error { return w.startFunc(job) }
5877
func (w *mockHandler) Stop(key string) error { return w.stopFunc(key) }
59-
func (w *mockHandler) Notify(p []byte) error { return w.notifyFunc(p) }
78+
func (w *mockHandler) HandleNotification(key string, payload []byte) error {
79+
return w.notifyFunc(key, payload)
80+
}
81+
82+
func (h *mockHandlerWithoutNotify) Start(job *Job) error { return h.startFunc(job) }
83+
func (h *mockHandlerWithoutNotify) Stop(key string) error { return h.stopFunc(key) }

pool/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (w *Worker) notify(_ context.Context, key string, payload []byte) error {
285285
}
286286
nh, ok := w.handler.(NotificationHandler)
287287
if !ok {
288-
w.logger.Debug("worker does not implement NotificationHandler, ignoring notification")
288+
w.logger.Error(fmt.Errorf("worker does not implement NotificationHandler, ignoring notification"), "worker", w.ID)
289289
return nil
290290
}
291291
w.logger.Debug("handled notification", "payload", string(payload))

streaming/reader.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ type (
5959
rdb *redis.Client
6060
}
6161

62+
// Acker is the interface used by events to acknowledge themselves.
63+
Acker interface {
64+
XAck(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd
65+
}
66+
6267
// Event is a stream event.
6368
Event struct {
6469
// ID is the unique event ID.
@@ -73,10 +78,10 @@ type (
7378
Topic string
7479
// Payload is the event payload.
7580
Payload []byte
81+
// Acker is the redis client used to acknowledge events.
82+
Acker Acker
7683
// streamKey is the Redis key of the stream.
7784
streamKey string
78-
// rdb is the redis client.
79-
rdb *redis.Client
8085
}
8186
)
8287

@@ -314,7 +319,7 @@ func streamEvents(
314319
Topic: topic,
315320
Payload: []byte(event.Values[payloadKey].(string)),
316321
streamKey: streamKey,
317-
rdb: rdb,
322+
Acker: rdb,
318323
}
319324
if eventFilter != nil && !eventFilter(ev) {
320325
logger.Debug("event filtered", "event", ev.EventName, "id", ev.ID, "stream", streamName)

streaming/sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (s *Sink) Unsubscribe(c <-chan *Event) {
197197

198198
// Ack acknowledges the event.
199199
func (s *Sink) Ack(ctx context.Context, e *Event) error {
200-
err := e.rdb.XAck(ctx, e.streamKey, e.SinkName, e.ID).Err()
200+
err := e.Acker.XAck(ctx, e.streamKey, e.SinkName, e.ID).Err()
201201
if err != nil {
202202
s.logger.Error(err, "ack", e.ID, "stream", e.StreamName)
203203
return err

0 commit comments

Comments
 (0)