@@ -22,6 +22,170 @@ const (
2222 max = time .Second
2323)
2424
25+ func TestWorkers (t * testing.T ) {
26+ testName := strings .Replace (t .Name (), "/" , "_" , - 1 )
27+ ctx := ptesting .NewTestContext (t )
28+ rdb := ptesting .NewRedisClient (t )
29+ node := newTestNode (t , ctx , rdb , testName )
30+ defer ptesting .CleanupRedis (t , rdb , true , testName )
31+
32+ // Create a few workers
33+ worker1 := newTestWorker (t , ctx , node )
34+ worker2 := newTestWorker (t , ctx , node )
35+ worker3 := newTestWorker (t , ctx , node )
36+
37+ // Get the list of workers
38+ workers := node .Workers ()
39+
40+ // Check if the number of workers is correct
41+ assert .Equal (t , 3 , len (workers ), "Expected 3 workers" )
42+
43+ // Check if all created workers are in the list
44+ expectedWorkers := []string {worker1 .ID , worker2 .ID , worker3 .ID }
45+ actualWorkers := make ([]string , len (workers ))
46+ for i , w := range workers {
47+ actualWorkers [i ] = w .ID
48+ }
49+ assert .ElementsMatch (t , expectedWorkers , actualWorkers , "The list of workers should contain all created workers" )
50+
51+ // Shutdown node
52+ assert .NoError (t , node .Shutdown (ctx ), "Failed to shutdown node" )
53+ }
54+
55+ func TestPoolWorkers (t * testing.T ) {
56+ testName := strings .Replace (t .Name (), "/" , "_" , - 1 )
57+ ctx := ptesting .NewTestContext (t )
58+ rdb := ptesting .NewRedisClient (t )
59+ node := newTestNode (t , ctx , rdb , testName )
60+ defer ptesting .CleanupRedis (t , rdb , true , testName )
61+
62+ // Create workers on the current node
63+ worker1 := newTestWorker (t , ctx , node )
64+ worker2 := newTestWorker (t , ctx , node )
65+
66+ // Create a worker on a different node
67+ otherNode := newTestNode (t , ctx , rdb , testName )
68+ worker3 := newTestWorker (t , ctx , otherNode )
69+ defer func () { assert .NoError (t , otherNode .Shutdown (ctx )) }()
70+
71+ // Check if the number of workers is correct (should include workers from all nodes)
72+ assert .Eventually (t , func () bool {
73+ return len (node .PoolWorkers ()) == 3
74+ }, max , delay , "Expected 3 workers in the pool" )
75+
76+ // Check if all created workers are in the list
77+ poolWorkers := node .PoolWorkers ()
78+ workerIDs := make ([]string , len (poolWorkers ))
79+ for i , w := range poolWorkers {
80+ workerIDs [i ] = w .ID
81+ }
82+
83+ expectedWorkerIDs := []string {worker1 .ID , worker2 .ID , worker3 .ID }
84+ assert .ElementsMatch (t , expectedWorkerIDs , workerIDs , "Not all expected workers were found in the pool" )
85+
86+ // Shutdown nodes
87+ assert .NoError (t , node .Shutdown (ctx ), "Failed to shutdown node" )
88+ }
89+
90+ func TestJobKeys (t * testing.T ) {
91+ testName := strings .Replace (t .Name (), "/" , "_" , - 1 )
92+ ctx := ptesting .NewTestContext (t )
93+ rdb := ptesting .NewRedisClient (t )
94+ defer ptesting .CleanupRedis (t , rdb , true , testName )
95+
96+ node1 := newTestNode (t , ctx , rdb , testName )
97+ node2 := newTestNode (t , ctx , rdb , testName )
98+ newTestWorker (t , ctx , node1 )
99+ newTestWorker (t , ctx , node2 )
100+ defer func () {
101+ assert .NoError (t , node1 .Shutdown (ctx ))
102+ assert .NoError (t , node2 .Shutdown (ctx ))
103+ }()
104+
105+ // Configure nodes to send jobs to specific workers
106+ node1 .h , node2 .h = & ptesting.Hasher {Index : 0 }, & ptesting.Hasher {Index : 1 }
107+
108+ jobs := []struct {
109+ key string
110+ payload []byte
111+ }{
112+ {key : "job1" , payload : []byte ("payload1" )},
113+ {key : "job2" , payload : []byte ("payload2" )},
114+ {key : "job3" , payload : []byte ("payload3" )},
115+ {key : "job4" , payload : []byte ("payload4" )},
116+ }
117+
118+ for _ , job := range jobs {
119+ assert .NoError (t , node1 .DispatchJob (ctx , job .key , job .payload ), fmt .Sprintf ("Failed to dispatch job: %s" , job .key ))
120+ }
121+
122+ // Get job keys from the pool and check if all dispatched job keys are present
123+ var allJobKeys []string
124+ assert .Eventually (t , func () bool {
125+ allJobKeys = node1 .JobKeys ()
126+ return len (jobs ) == len (allJobKeys )
127+ }, max , delay , fmt .Sprintf ("Number of job keys doesn't match the number of dispatched jobs: %d != %d" , len (jobs ), len (allJobKeys )))
128+ for _ , job := range jobs {
129+ assert .Contains (t , allJobKeys , job .key , fmt .Sprintf ("Job key %s not found in JobKeys" , job .key ))
130+ }
131+
132+ // Dispatch a job with an existing key to node1
133+ assert .NoError (t , node1 .DispatchJob (ctx , "job1" , []byte ("updated payload" )), "Failed to dispatch job with existing key" )
134+
135+ // Check that the number of job keys hasn't changed
136+ updatedAllJobKeys := node1 .JobKeys ()
137+ assert .Equal (t , len (allJobKeys ), len (updatedAllJobKeys ), "Number of job keys shouldn't change when updating an existing job" )
138+ }
139+
140+ func TestJobPayload (t * testing.T ) {
141+ testName := strings .Replace (t .Name (), "/" , "_" , - 1 )
142+ ctx := ptesting .NewTestContext (t )
143+ rdb := ptesting .NewRedisClient (t )
144+ defer ptesting .CleanupRedis (t , rdb , true , testName )
145+
146+ node := newTestNode (t , ctx , rdb , testName )
147+ newTestWorker (t , ctx , node )
148+ defer func () { assert .NoError (t , node .Shutdown (ctx )) }()
149+
150+ tests := []struct {
151+ name string
152+ key string
153+ payload []byte
154+ }{
155+ {"job with payload" , "job1" , []byte ("payload1" )},
156+ {"job without payload" , "job2" , nil },
157+ }
158+
159+ for _ , tt := range tests {
160+ t .Run (tt .name , func (t * testing.T ) {
161+ assert .NoError (t , node .DispatchJob (ctx , tt .key , tt .payload ), "Failed to dispatch job" )
162+
163+ // Check if job payload is correct
164+ assert .Eventually (t , func () bool {
165+ payload , ok := node .JobPayload (tt .key )
166+ fmt .Println (payload , ok )
167+ fmt .Println (tt .payload )
168+ return ok && assert .Equal (t , tt .payload , payload )
169+ }, max , delay , fmt .Sprintf ("Failed to get correct payload for job %s" , tt .key ))
170+ })
171+ }
172+
173+ // Test non-existent job
174+ payload , ok := node .JobPayload ("non-existent-job" )
175+ assert .False (t , ok , "Expected false for non-existent job" )
176+ assert .Nil (t , payload , "Expected nil payload for non-existent job" )
177+
178+ // Update existing job
179+ updatedPayload := []byte ("updated payload" )
180+ assert .NoError (t , node .DispatchJob (ctx , "job1" , updatedPayload ), "Failed to update existing job" )
181+
182+ // Check if the payload was updated
183+ assert .Eventually (t , func () bool {
184+ payload , ok := node .JobPayload ("job1" )
185+ return ok && assert .Equal (t , updatedPayload , payload , "Payload was not updated correctly" )
186+ }, max , delay , "Failed to get updated payload for job" )
187+ }
188+
25189func TestDispatchJobOneWorker (t * testing.T ) {
26190 testName := strings .Replace (t .Name (), "/" , "_" , - 1 )
27191 ctx := ptesting .NewTestContext (t )
@@ -306,6 +470,34 @@ func TestNodeCloseAndRequeue(t *testing.T) {
306470 require .NoError (t , node2 .Shutdown (ctx ), "Failed to shutdown node2" )
307471}
308472
473+ func TestAckWorkerEventWithMissingPendingEvent (t * testing.T ) {
474+ // Setup
475+ ctx := ptesting .NewTestContext (t )
476+ testName := strings .Replace (t .Name (), "/" , "_" , - 1 )
477+ rdb := ptesting .NewRedisClient (t )
478+ defer ptesting .CleanupRedis (t , rdb , true , testName )
479+ node := newTestNode (t , ctx , rdb , testName )
480+ defer func () { assert .NoError (t , node .Shutdown (ctx )) }()
481+
482+ // Create a mock event with a non-existent pending event ID
483+ mockEvent := & streaming.Event {
484+ ID : "non-existent-event-id" ,
485+ EventName : evAck ,
486+ Payload : marshalEnvelope ("worker" , marshalAck (& ack {EventID : "non-existent-event-id" })),
487+ Acker : & mockAcker {
488+ XAckFunc : func (ctx context.Context , streamKey , sinkName string , ids ... string ) * redis.IntCmd {
489+ return redis .NewIntCmd (ctx , 0 )
490+ },
491+ },
492+ }
493+
494+ // Call ackWorkerEvent with the mock event
495+ node .ackWorkerEvent (ctx , mockEvent )
496+
497+ // Verify that no panic occurred and the function completed successfully
498+ assert .True (t , true , "ackWorkerEvent should complete without panic" )
499+ }
500+
309501func TestStaleEventsAreRemoved (t * testing.T ) {
310502 // Setup
311503 ctx := ptesting .NewTestContext (t )
0 commit comments