From d124e6b231900201c02ab29deec52a7b7b39899f Mon Sep 17 00:00:00 2001 From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com> Date: Mon, 22 Jun 2026 20:31:51 -0700 Subject: [PATCH] Fix duplicate cron trigger events when callback blocks on I/O Production workflows with a single cron trigger were occasionally delivering the same trigger event ID twice, causing the workflow engine to log "Skipping duplicate execution" for a delayed second delivery. Root cause: gocron allows overlapping task runs by default. The cron callback read trigger.nextRun at the start but only advanced it after blocking work (org resolver lookup, TriggerExecutionStarted emission). If the callback blocked long enough for the next scheduled tick to fire, a concurrent gocron invocation read the same stale nextRun and emitted a duplicate event with the identical scheduled execution time / legacy execution ID. Fix: - Advance nextRun via job.NextRun() immediately after capturing the scheduled execution time, before any blocking I/O in the callback. - Register cron jobs with gocron.WithSingletonMode(LimitModeWait) so a second invocation cannot run concurrently while the first is still in progress; overdue ticks queue and run sequentially afterward. Add TestCronTrigger_DelayedDuplicateEventWhenCallbackBlocks with a blocking org resolver to reproduce the production scenario and assert that no duplicate trigger event IDs are delivered. Co-authored-by: Cursor --- cron/trigger/trigger.go | 42 ++++++++---- cron/trigger/trigger_test.go | 121 +++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 14 deletions(-) diff --git a/cron/trigger/trigger.go b/cron/trigger/trigger.go index d78adf404..e5a649e0c 100644 --- a/cron/trigger/trigger.go +++ b/cron/trigger/trigger.go @@ -248,6 +248,28 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat scheduledExecutionTimeUTC := trigger.nextRun.UTC() currentTimeUTC := s.clock.Now().UTC() + nextExecutionTime, nextRunErr := job.NextRun() + if nextRunErr != nil { + // .NextRun() will error if the job no longer exists + // or if there is no next run to schedule, which shouldn't happen with cron jobs + s.lggr.Errorw("task callback failed to schedule next run", "triggerID", triggerID) + } + + // Advance nextRun before blocking I/O so overlapping task invocations + // cannot reuse the same scheduled execution time. + muCh.Lock() + if callbackCh == nil { + muCh.Unlock() + return // unregistered already + } + s.triggers.Write(triggerID, cronTrigger{ + job: job, + nextRun: nextExecutionTime, + workflowID: trigger.workflowID, + close: closeCh, + }) + muCh.Unlock() + response := createTriggerResponse(scheduledExecutionTimeUTC) displayWorkflowName := metadata.DecodedWorkflowName @@ -309,24 +331,11 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat s.lggr.Debugw("task callback sending trigger response", "executionID", workflowExecutionID, "isLegacyExecutionID", isLegacyExecutionID, "triggerID", triggerID, "scheduledExecTimeUTC", scheduledExecutionTimeUTC.Format(time.RFC3339Nano), "actualExecTimeUTC", currentTimeUTC.Format(time.RFC3339Nano)) - nextExecutionTime, nextRunErr := job.NextRun() - if nextRunErr != nil { - // .NextRun() will error if the job no longer exists - // or if there is no next run to schedule, which shouldn't happen with cron jobs - s.lggr.Errorw("task callback failed to schedule next run", "executionID", workflowExecutionID, "triggerID", triggerID) - } - muCh.RLock() defer muCh.RUnlock() if callbackCh == nil { return // unregistered already } - s.triggers.Write(triggerID, cronTrigger{ - job: job, - nextRun: nextExecutionTime, - workflowID: metadata.WorkflowID, - close: closeCh, - }) select { case callbackCh <- response: @@ -349,7 +358,12 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat } // If service has already started, job will be scheduled immediately - job, err = s.scheduler.NewJob(jobDef, task, gocron.WithName(triggerID)) + job, err = s.scheduler.NewJob( + jobDef, + task, + gocron.WithName(triggerID), + gocron.WithSingletonMode(gocron.LimitModeWait), + ) if err != nil { s.lggr.Errorw("failed to create new job", "err", err) return nil, caperrors.NewPublicSystemError(fmt.Errorf("RegisterTrigger failed to create new job: %s", err), caperrors.Internal) diff --git a/cron/trigger/trigger_test.go b/cron/trigger/trigger_test.go index 49aca17d9..1bcdc3239 100644 --- a/cron/trigger/trigger_test.go +++ b/cron/trigger/trigger_test.go @@ -8,6 +8,7 @@ import ( "math" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -1258,6 +1259,126 @@ func TestCronTrigger_MultiTriggerFlag_ExecutionIDPaths(t *testing.T) { }) } +// blockingOrgResolver blocks the first Get call until release is closed, simulating +// slow org-resolver I/O inside the cron task callback before nextRun is advanced. +type blockingOrgResolver struct { + firstBlocked chan struct{} + release chan struct{} + getCount atomic.Int32 + once sync.Once +} + +func (r *blockingOrgResolver) Get(ctx context.Context, owner string) (string, error) { + if r.getCount.Add(1) == 1 { + r.once.Do(func() { close(r.firstBlocked) }) + select { + case <-r.release: + case <-ctx.Done(): + return "", ctx.Err() + } + } + return "test-org-id", nil +} + +func (r *blockingOrgResolver) Start(ctx context.Context) error { return nil } +func (r *blockingOrgResolver) Close() error { return nil } +func (r *blockingOrgResolver) HealthReport() map[string]error { return map[string]error{} } +func (r *blockingOrgResolver) Ready() error { return nil } +func (r *blockingOrgResolver) Name() string { return "BlockingOrgResolver" } + +var _ orgresolver.OrgResolver = (*blockingOrgResolver)(nil) + +// TestCronTrigger_DelayedDuplicateEventWhenCallbackBlocks ensures a cron trigger does not +// deliver duplicate trigger event IDs when the task callback blocks (e.g. on org resolver I/O). +func TestCronTrigger_DelayedDuplicateEventWhenCallbackBlocks(t *testing.T) { + const testWorkflowID = "00c0de5771c4f20ed242bedd2de9d3fdf9a0fbe5d03aefca9bdb29013a28de36" + const testTriggerID = testWorkflowID + "|cron-0" + + releaseFirst := make(chan struct{}) + orgResolver := &blockingOrgResolver{ + firstBlocked: make(chan struct{}), + release: releaseFirst, + } + + triggerConfig, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) + require.NoError(t, err) + + ts, err := NewTriggerService(logger.Nop(), clockwork.NewRealClock(), limits.Factory{}) + require.NoError(t, err) + err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ + Config: string(triggerConfig), + OrgResolver: orgResolver, + }) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, ts.Close()) }) + + metadata := capabilities.RequestMetadata{ + WorkflowID: testWorkflowID, + WorkflowOwner: "0x0000000000000000000000000000000000000001", + ReferenceID: workflows.GetTriggerReferenceID(0), + } + callback, capErr := ts.RegisterTrigger(t.Context(), testTriggerID, metadata, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + eventsCh := make(chan capabilities.TriggerAndId[*crontypedapi.Payload], 4) + collectDone := make(chan struct{}) + go func() { + defer close(collectDone) + for event := range callback { + eventsCh <- event + } + close(eventsCh) + }() + + select { + case <-orgResolver.firstBlocked: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for first cron task to block in org resolver") + } + + // While the first task is still blocked, no trigger event should be delivered. + select { + case event := <-eventsCh: + t.Fatalf("unexpected trigger event while callback is blocked: %s", event.Id) + case <-time.After(1 * time.Second): + } + + close(releaseFirst) + + var events []capabilities.TriggerAndId[*crontypedapi.Payload] + select { + case event := <-eventsCh: + events = append(events, event) + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for trigger event after blocked callback completes") + } + + // Fixed window: keep reading so duplicate IDs cannot slip through after the first event. + collectUntil := time.After(2 * time.Second) +collectLoop: + for { + select { + case event, ok := <-eventsCh: + if !ok { + break collectLoop + } + events = append(events, event) + case <-collectUntil: + break collectLoop + } + } + + seenEventIDs := make(map[string]struct{}, len(events)) + for _, event := range events { + _, duplicate := seenEventIDs[event.Id] + require.False(t, duplicate, "trigger event ID %s was delivered more than once", event.Id) + seenEventIDs[event.Id] = struct{}{} + } + + require.Nil(t, ts.UnregisterTrigger(t.Context(), testTriggerID, metadata, &crontypedapi.Config{Schedule: everySecond})) + <-collectDone +} + func TestEnforceFastestSchedule_NonUniformSecondsField(t *testing.T) { t.Parallel() // a schedule that has a bunch of 5s gaps followed by a bunch of 1s gaps