@@ -27,8 +27,8 @@ import (
2727type (
2828 // Node is a pool of workers.
2929 Node struct {
30- Name string
3130 NodeID string
31+ PoolName string
3232 poolStream * streaming.Stream // pool event stream for dispatching jobs
3333 poolSink * streaming.Sink // pool event sink
3434 nodeStream * streaming.Stream // node event stream for receiving worker events
@@ -93,14 +93,14 @@ const (
9393// The options WithClientOnly can be used to create a node that can only be used
9494// to dispatch jobs. Such a node does not route or process jobs in the
9595// background.
96- func AddNode (ctx context.Context , name string , rdb * redis.Client , opts ... NodeOption ) (* Node , error ) {
96+ func AddNode (ctx context.Context , poolName string , rdb * redis.Client , opts ... NodeOption ) (* Node , error ) {
9797 o := parseOptions (opts ... )
9898 logger := o .logger
9999 nodeID := ulid .Make ().String ()
100100 if logger == nil {
101101 logger = pulse .NoopLogger ()
102102 } else {
103- logger = logger .WithPrefix ("pool" , name , "node" , nodeID )
103+ logger = logger .WithPrefix ("pool" , poolName , "node" , nodeID )
104104 }
105105 logger .Info ("options" ,
106106 "client_only" , o .clientOnly ,
@@ -110,18 +110,18 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp
110110 "pending_job_ttl" , o .pendingJobTTL ,
111111 "job_sink_block_duration" , o .jobSinkBlockDuration ,
112112 "ack_grace_period" , o .ackGracePeriod )
113- wsm , err := rmap .Join (ctx , shutdownMapName (name ), rdb , rmap .WithLogger (logger ))
113+ wsm , err := rmap .Join (ctx , shutdownMapName (poolName ), rdb , rmap .WithLogger (logger ))
114114 if err != nil {
115- return nil , fmt .Errorf ("AddNode: failed to join shutdown replicated map %q: %w" , shutdownMapName (name ), err )
115+ return nil , fmt .Errorf ("AddNode: failed to join shutdown replicated map %q: %w" , shutdownMapName (poolName ), err )
116116 }
117117 if wsm .Len () > 0 {
118- return nil , fmt .Errorf ("AddNode: pool %q is shutting down" , name )
118+ return nil , fmt .Errorf ("AddNode: pool %q is shutting down" , poolName )
119119 }
120- poolStream , err := streaming .NewStream (poolStreamName (name ), rdb ,
120+ poolStream , err := streaming .NewStream (poolStreamName (poolName ), rdb ,
121121 soptions .WithStreamMaxLen (o .maxQueuedJobs ),
122122 soptions .WithStreamLogger (logger ))
123123 if err != nil {
124- return nil , fmt .Errorf ("AddNode: failed to create pool job stream %q: %w" , poolStreamName (name ), err )
124+ return nil , fmt .Errorf ("AddNode: failed to create pool job stream %q: %w" , poolStreamName (poolName ), err )
125125 }
126126 var (
127127 wm * rmap.Map
@@ -134,47 +134,47 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp
134134 nodeReader * streaming.Reader
135135 )
136136 if ! o .clientOnly {
137- wm , err = rmap .Join (ctx , workerMapName (name ), rdb , rmap .WithLogger (logger ))
137+ wm , err = rmap .Join (ctx , workerMapName (poolName ), rdb , rmap .WithLogger (logger ))
138138 if err != nil {
139- return nil , fmt .Errorf ("AddNode: failed to join pool workers replicated map %q: %w" , workerMapName (name ), err )
139+ return nil , fmt .Errorf ("AddNode: failed to join pool workers replicated map %q: %w" , workerMapName (poolName ), err )
140140 }
141141 workerIDs := wm .Keys ()
142142 logger .Info ("joined" , "workers" , workerIDs )
143- jm , err = rmap .Join (ctx , jobsMapName (name ), rdb , rmap .WithLogger (logger ))
143+ jm , err = rmap .Join (ctx , jobsMapName (poolName ), rdb , rmap .WithLogger (logger ))
144144 if err != nil {
145- return nil , fmt .Errorf ("AddNode: failed to join pool jobs replicated map %q: %w" , jobsMapName (name ), err )
145+ return nil , fmt .Errorf ("AddNode: failed to join pool jobs replicated map %q: %w" , jobsMapName (poolName ), err )
146146 }
147- jpm , err = rmap .Join (ctx , jobPayloadsMapName (name ), rdb , rmap .WithLogger (logger ))
147+ jpm , err = rmap .Join (ctx , jobPayloadsMapName (poolName ), rdb , rmap .WithLogger (logger ))
148148 if err != nil {
149- return nil , fmt .Errorf ("AddNode: failed to join pool job payloads replicated map %q: %w" , jobPayloadsMapName (name ), err )
149+ return nil , fmt .Errorf ("AddNode: failed to join pool job payloads replicated map %q: %w" , jobPayloadsMapName (poolName ), err )
150150 }
151- km , err = rmap .Join (ctx , keepAliveMapName (name ), rdb , rmap .WithLogger (logger ))
151+ km , err = rmap .Join (ctx , keepAliveMapName (poolName ), rdb , rmap .WithLogger (logger ))
152152 if err != nil {
153- return nil , fmt .Errorf ("AddNode: failed to join pool keep-alive replicated map %q: %w" , keepAliveMapName (name ), err )
153+ return nil , fmt .Errorf ("AddNode: failed to join pool keep-alive replicated map %q: %w" , keepAliveMapName (poolName ), err )
154154 }
155- tm , err = rmap .Join (ctx , tickerMapName (name ), rdb , rmap .WithLogger (logger ))
155+ tm , err = rmap .Join (ctx , tickerMapName (poolName ), rdb , rmap .WithLogger (logger ))
156156 if err != nil {
157- return nil , fmt .Errorf ("AddNode: failed to join pool ticker replicated map %q: %w" , tickerMapName (name ), err )
157+ return nil , fmt .Errorf ("AddNode: failed to join pool ticker replicated map %q: %w" , tickerMapName (poolName ), err )
158158 }
159159 poolSink , err = poolStream .NewSink (ctx , "events" ,
160160 soptions .WithSinkBlockDuration (o .jobSinkBlockDuration ),
161161 soptions .WithSinkAckGracePeriod (o .ackGracePeriod ))
162162 if err != nil {
163- return nil , fmt .Errorf ("AddNode: failed to create events sink for stream %q: %w" , poolStreamName (name ), err )
163+ return nil , fmt .Errorf ("AddNode: failed to create events sink for stream %q: %w" , poolStreamName (poolName ), err )
164164 }
165165 }
166- nodeStream , err = streaming .NewStream (nodeStreamName (name , nodeID ), rdb , soptions .WithStreamLogger (logger ))
166+ nodeStream , err = streaming .NewStream (nodeStreamName (poolName , nodeID ), rdb , soptions .WithStreamLogger (logger ))
167167 if err != nil {
168- return nil , fmt .Errorf ("AddNode: failed to create node event stream %q: %w" , nodeStreamName (name , nodeID ), err )
168+ return nil , fmt .Errorf ("AddNode: failed to create node event stream %q: %w" , nodeStreamName (poolName , nodeID ), err )
169169 }
170170 nodeReader , err = nodeStream .NewReader (ctx , soptions .WithReaderBlockDuration (o .jobSinkBlockDuration ), soptions .WithReaderStartAtOldest ())
171171 if err != nil {
172- return nil , fmt .Errorf ("AddNode: failed to create node event reader for stream %q: %w" , nodeStreamName (name , nodeID ), err )
172+ return nil , fmt .Errorf ("AddNode: failed to create node event reader for stream %q: %w" , nodeStreamName (poolName , nodeID ), err )
173173 }
174174
175175 p := & Node {
176- Name : name ,
177176 NodeID : nodeID ,
177+ PoolName : poolName ,
178178 keepAliveMap : km ,
179179 workerMap : wm ,
180180 jobsMap : jm ,
@@ -225,10 +225,10 @@ func (node *Node) AddWorker(ctx context.Context, handler JobHandler) (*Worker, e
225225 node .lock .Lock ()
226226 defer node .lock .Unlock ()
227227 if node .closing {
228- return nil , fmt .Errorf ("AddWorker: pool %q is closed" , node .Name )
228+ return nil , fmt .Errorf ("AddWorker: pool %q is closed" , node .PoolName )
229229 }
230230 if node .clientOnly {
231- return nil , fmt .Errorf ("AddWorker: pool %q is client-only" , node .Name )
231+ return nil , fmt .Errorf ("AddWorker: pool %q is client-only" , node .PoolName )
232232 }
233233 w , err := newWorker (ctx , node , handler )
234234 if err != nil {
@@ -303,7 +303,7 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e
303303 node .lock .Lock ()
304304 if node .closing {
305305 node .lock .Unlock ()
306- return fmt .Errorf ("DispatchJob: pool %q is closed" , node .Name )
306+ return fmt .Errorf ("DispatchJob: pool %q is closed" , node .PoolName )
307307 }
308308 job := marshalJob (& Job {Key : key , Payload : payload , CreatedAt : time .Now (), NodeID : node .NodeID })
309309 eventID , err := node .poolStream .Add (ctx , evStartJob , job )
@@ -344,7 +344,7 @@ func (node *Node) StopJob(ctx context.Context, key string) error {
344344 node .lock .Lock ()
345345 defer node .lock .Unlock ()
346346 if node .closing {
347- return fmt .Errorf ("StopJob: pool %q is closed" , node .Name )
347+ return fmt .Errorf ("StopJob: pool %q is closed" , node .PoolName )
348348 }
349349 if _ , err := node .poolStream .Add (ctx , evStopJob , marshalJobKey (key )); err != nil {
350350 return fmt .Errorf ("StopJob: failed to add stop job to stream %q: %w" , node .poolStream .Name , err )
@@ -383,7 +383,7 @@ func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte)
383383 node .lock .Lock ()
384384 defer node .lock .Unlock ()
385385 if node .closing {
386- return fmt .Errorf ("NotifyWorker: pool %q is closed" , node .Name )
386+ return fmt .Errorf ("NotifyWorker: pool %q is closed" , node .PoolName )
387387 }
388388 if _ , err := node .poolStream .Add (ctx , evNotify , marshalNotification (key , payload )); err != nil {
389389 return fmt .Errorf ("NotifyWorker: failed to add notification to stream %q: %w" , node .poolStream .Name , err )
@@ -421,7 +421,7 @@ func (node *Node) Shutdown(ctx context.Context) error {
421421 }
422422
423423 // Now clean up the shutdown replicated map.
424- wsm , err := rmap .Join (ctx , shutdownMapName (node .Name ), node .rdb , rmap .WithLogger (node .logger ))
424+ wsm , err := rmap .Join (ctx , shutdownMapName (node .PoolName ), node .rdb , rmap .WithLogger (node .logger ))
425425 if err != nil {
426426 node .logger .Error (fmt .Errorf ("Shutdown: failed to join shutdown map for cleanup: %w" , err ))
427427 }
@@ -535,7 +535,7 @@ func (node *Node) routeWorkerEvent(ctx context.Context, ev *streaming.Event) err
535535 key := unmarshalJobKey (ev .Payload )
536536 activeWorkers := node .activeWorkers ()
537537 if len (activeWorkers ) == 0 {
538- return fmt .Errorf ("routeWorkerEvent: no active worker in pool %q" , node .Name )
538+ return fmt .Errorf ("routeWorkerEvent: no active worker in pool %q" , node .PoolName )
539539 }
540540 wid := activeWorkers [node .h .Hash (key , int64 (len (activeWorkers )))]
541541
@@ -606,14 +606,14 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
606606 // dispatched the job.
607607 if pending .EventName == evStartJob {
608608 _ , nodeID := unmarshalJobKeyAndNodeID (pending .Payload )
609- stream , err := streaming .NewStream (nodeStreamName (node .Name , nodeID ), node .rdb , soptions .WithStreamLogger (node .logger ))
609+ stream , err := streaming .NewStream (nodeStreamName (node .PoolName , nodeID ), node .rdb , soptions .WithStreamLogger (node .logger ))
610610 if err != nil {
611- node .logger .Error (fmt .Errorf ("ackWorkerEvent: failed to create node event stream %q: %w" , nodeStreamName (node .Name , nodeID ), err ))
611+ node .logger .Error (fmt .Errorf ("ackWorkerEvent: failed to create node event stream %q: %w" , nodeStreamName (node .PoolName , nodeID ), err ))
612612 return
613613 }
614614 ack .EventID = pending .ID
615615 if _ , err := stream .Add (ctx , evDispatchReturn , marshalAck (ack ), soptions .WithOnlyIfStreamExists ()); err != nil {
616- node .logger .Error (fmt .Errorf ("ackWorkerEvent: failed to dispatch return to stream %q: %w" , nodeStreamName (node .Name , nodeID ), err ))
616+ node .logger .Error (fmt .Errorf ("ackWorkerEvent: failed to dispatch return to stream %q: %w" , nodeStreamName (node .PoolName , nodeID ), err ))
617617 }
618618 }
619619
0 commit comments