@@ -541,7 +541,7 @@ func (node *Node) routeWorkerEvent(ctx context.Context, ev *streaming.Event) err
541541 node .logger .Debug ("routed" , "event" , ev .EventName , "id" , ev .ID , "worker" , wid , "worker-event-id" , eventID )
542542
543543 // Record the event in the pending events map for future ack.
544- node .pendingEvents [wid + ":" + eventID ] = ev
544+ node .pendingEvents [pendingEventKey ( wid , eventID ) ] = ev
545545
546546 return nil
547547}
@@ -583,7 +583,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
583583
584584 workerID , payload := unmarshalEnvelope (ev .Payload )
585585 ack := unmarshalAck (payload )
586- key := workerID + ":" + ack .EventID
586+ key := pendingEventKey ( workerID , ack .EventID )
587587 pending , ok := node .pendingEvents [key ]
588588 if ! ok {
589589 node .logger .Error (fmt .Errorf ("ackWorkerEvent: received unknown event %s from worker %s" , ack .EventID , workerID ))
@@ -619,7 +619,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
619619 }
620620 }
621621 for _ , key := range staleKeys {
622- node .logger .Error (fmt .Errorf ("ackWorkerEvent: stale event, removing from pending events" ), "event" , node .pendingEvents [key ].EventName , "id" , key )
622+ node .logger .Error (fmt .Errorf ("ackWorkerEvent: stale event, removing from pending events" ), "event" , node .pendingEvents [key ].EventName , "id" , node . pendingEvents [ key ]. ID , "since" , time . Since ( node . pendingEvents [ key ]. CreatedAt ()), "TTL" , 2 * node . pendingJobTTL )
623623 delete (node .pendingEvents , key )
624624 }
625625}
@@ -1028,3 +1028,9 @@ func poolStreamName(pool string) string {
10281028func nodeStreamName (pool , nodeID string ) string {
10291029 return fmt .Sprintf ("%s:node:%s" , pool , nodeID )
10301030}
1031+
1032+ // pendingEventKey computes the key of a pending event from a worker ID and a
1033+ // stream event ID.
1034+ func pendingEventKey (workerID , eventID string ) string {
1035+ return fmt .Sprintf ("%s:%s" , workerID , eventID )
1036+ }
0 commit comments