|
42 | 42 | workerTTL time.Duration // Worker considered dead if keep-alive not updated after this duration |
43 | 43 | workerShutdownTTL time.Duration // Worker considered dead if not shutdown after this duration |
44 | 44 | pendingJobTTL time.Duration // Job lease expires if not acked after this duration |
| 45 | + ackGracePeriod time.Duration // Wait for return status up to this duration |
45 | 46 | logger pulse.Logger |
46 | 47 | h hasher |
47 | 48 | stop chan struct{} // closed when node is stopped |
@@ -191,6 +192,7 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp |
191 | 192 | workerTTL: o.workerTTL, |
192 | 193 | workerShutdownTTL: o.workerShutdownTTL, |
193 | 194 | pendingJobTTL: o.pendingJobTTL, |
| 195 | + ackGracePeriod: o.ackGracePeriod, |
194 | 196 | h: jumpHash{crc64.New(crc64.MakeTable(crc64.ECMA))}, |
195 | 197 | stop: make(chan struct{}), |
196 | 198 | rdb: rdb, |
@@ -313,16 +315,26 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e |
313 | 315 | node.pendingJobs[eventID] = cherr |
314 | 316 | node.lock.Unlock() |
315 | 317 |
|
316 | | - // Wait for return status. |
| 318 | + // Wait for return status up to ack grace period. |
| 319 | + timer := time.NewTimer(2 * node.ackGracePeriod) |
| 320 | + defer timer.Stop() |
| 321 | + |
317 | 322 | select { |
318 | 323 | case err = <-cherr: |
| 324 | + case <-timer.C: |
| 325 | + err = fmt.Errorf("DispatchJob: job %q timed out, TTL: %v", key, 2*node.ackGracePeriod) |
319 | 326 | case <-ctx.Done(): |
320 | 327 | err = ctx.Err() |
321 | 328 | } |
322 | 329 |
|
| 330 | + node.lock.Lock() |
| 331 | + delete(node.pendingJobs, eventID) |
323 | 332 | close(cherr) |
| 333 | + node.lock.Unlock() |
324 | 334 | if err == nil { |
325 | 335 | node.logger.Info("dispatched", "key", key) |
| 336 | + } else { |
| 337 | + node.logger.Error(fmt.Errorf("DispatchJob: failed to dispatch job: %w", err), "key", key) |
326 | 338 | } |
327 | 339 | return err |
328 | 340 | } |
@@ -636,9 +648,9 @@ func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) { |
636 | 648 | return |
637 | 649 | } |
638 | 650 | node.logger.Debug("dispatch return", "event", ev.EventName, "id", ev.ID, "ack-id", ack.EventID) |
639 | | - delete(node.pendingJobs, ack.EventID) |
640 | 651 | if cherr == nil { |
641 | 652 | // Event was requeued. |
| 653 | + delete(node.pendingJobs, ack.EventID) |
642 | 654 | return |
643 | 655 | } |
644 | 656 | var err error |
|
0 commit comments