diff --git a/cron/trigger/trigger.go b/cron/trigger/trigger.go index d78adf404..e5a649e0c 100644 --- a/cron/trigger/trigger.go +++ b/cron/trigger/trigger.go @@ -248,6 +248,28 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat scheduledExecutionTimeUTC := trigger.nextRun.UTC() currentTimeUTC := s.clock.Now().UTC() + nextExecutionTime, nextRunErr := job.NextRun() + if nextRunErr != nil { + // .NextRun() will error if the job no longer exists + // or if there is no next run to schedule, which shouldn't happen with cron jobs + s.lggr.Errorw("task callback failed to schedule next run", "triggerID", triggerID) + } + + // Advance nextRun before blocking I/O so overlapping task invocations + // cannot reuse the same scheduled execution time. + muCh.Lock() + if callbackCh == nil { + muCh.Unlock() + return // unregistered already + } + s.triggers.Write(triggerID, cronTrigger{ + job: job, + nextRun: nextExecutionTime, + workflowID: trigger.workflowID, + close: closeCh, + }) + muCh.Unlock() + response := createTriggerResponse(scheduledExecutionTimeUTC) displayWorkflowName := metadata.DecodedWorkflowName @@ -309,24 +331,11 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat s.lggr.Debugw("task callback sending trigger response", "executionID", workflowExecutionID, "isLegacyExecutionID", isLegacyExecutionID, "triggerID", triggerID, "scheduledExecTimeUTC", scheduledExecutionTimeUTC.Format(time.RFC3339Nano), "actualExecTimeUTC", currentTimeUTC.Format(time.RFC3339Nano)) - nextExecutionTime, nextRunErr := job.NextRun() - if nextRunErr != nil { - // .NextRun() will error if the job no longer exists - // or if there is no next run to schedule, which shouldn't happen with cron jobs - s.lggr.Errorw("task callback failed to schedule next run", "executionID", workflowExecutionID, "triggerID", triggerID) - } - muCh.RLock() defer muCh.RUnlock() if callbackCh == nil { return // unregistered already } - s.triggers.Write(triggerID, cronTrigger{ - job: job, - nextRun: nextExecutionTime, - workflowID: metadata.WorkflowID, - close: closeCh, - }) select { case callbackCh <- response: @@ -349,7 +358,12 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat } // If service has already started, job will be scheduled immediately - job, err = s.scheduler.NewJob(jobDef, task, gocron.WithName(triggerID)) + job, err = s.scheduler.NewJob( + jobDef, + task, + gocron.WithName(triggerID), + gocron.WithSingletonMode(gocron.LimitModeWait), + ) if err != nil { s.lggr.Errorw("failed to create new job", "err", err) return nil, caperrors.NewPublicSystemError(fmt.Errorf("RegisterTrigger failed to create new job: %s", err), caperrors.Internal) diff --git a/cron/trigger/trigger_test.go b/cron/trigger/trigger_test.go index 49aca17d9..1bcdc3239 100644 --- a/cron/trigger/trigger_test.go +++ b/cron/trigger/trigger_test.go @@ -8,6 +8,7 @@ import ( "math" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -1258,6 +1259,126 @@ func TestCronTrigger_MultiTriggerFlag_ExecutionIDPaths(t *testing.T) { }) } +// blockingOrgResolver blocks the first Get call until release is closed, simulating +// slow org-resolver I/O inside the cron task callback before nextRun is advanced. +type blockingOrgResolver struct { + firstBlocked chan struct{} + release chan struct{} + getCount atomic.Int32 + once sync.Once +} + +func (r *blockingOrgResolver) Get(ctx context.Context, owner string) (string, error) { + if r.getCount.Add(1) == 1 { + r.once.Do(func() { close(r.firstBlocked) }) + select { + case <-r.release: + case <-ctx.Done(): + return "", ctx.Err() + } + } + return "test-org-id", nil +} + +func (r *blockingOrgResolver) Start(ctx context.Context) error { return nil } +func (r *blockingOrgResolver) Close() error { return nil } +func (r *blockingOrgResolver) HealthReport() map[string]error { return map[string]error{} } +func (r *blockingOrgResolver) Ready() error { return nil } +func (r *blockingOrgResolver) Name() string { return "BlockingOrgResolver" } + +var _ orgresolver.OrgResolver = (*blockingOrgResolver)(nil) + +// TestCronTrigger_DelayedDuplicateEventWhenCallbackBlocks ensures a cron trigger does not +// deliver duplicate trigger event IDs when the task callback blocks (e.g. on org resolver I/O). +func TestCronTrigger_DelayedDuplicateEventWhenCallbackBlocks(t *testing.T) { + const testWorkflowID = "00c0de5771c4f20ed242bedd2de9d3fdf9a0fbe5d03aefca9bdb29013a28de36" + const testTriggerID = testWorkflowID + "|cron-0" + + releaseFirst := make(chan struct{}) + orgResolver := &blockingOrgResolver{ + firstBlocked: make(chan struct{}), + release: releaseFirst, + } + + triggerConfig, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) + require.NoError(t, err) + + ts, err := NewTriggerService(logger.Nop(), clockwork.NewRealClock(), limits.Factory{}) + require.NoError(t, err) + err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ + Config: string(triggerConfig), + OrgResolver: orgResolver, + }) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, ts.Close()) }) + + metadata := capabilities.RequestMetadata{ + WorkflowID: testWorkflowID, + WorkflowOwner: "0x0000000000000000000000000000000000000001", + ReferenceID: workflows.GetTriggerReferenceID(0), + } + callback, capErr := ts.RegisterTrigger(t.Context(), testTriggerID, metadata, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + eventsCh := make(chan capabilities.TriggerAndId[*crontypedapi.Payload], 4) + collectDone := make(chan struct{}) + go func() { + defer close(collectDone) + for event := range callback { + eventsCh <- event + } + close(eventsCh) + }() + + select { + case <-orgResolver.firstBlocked: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for first cron task to block in org resolver") + } + + // While the first task is still blocked, no trigger event should be delivered. + select { + case event := <-eventsCh: + t.Fatalf("unexpected trigger event while callback is blocked: %s", event.Id) + case <-time.After(1 * time.Second): + } + + close(releaseFirst) + + var events []capabilities.TriggerAndId[*crontypedapi.Payload] + select { + case event := <-eventsCh: + events = append(events, event) + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for trigger event after blocked callback completes") + } + + // Fixed window: keep reading so duplicate IDs cannot slip through after the first event. + collectUntil := time.After(2 * time.Second) +collectLoop: + for { + select { + case event, ok := <-eventsCh: + if !ok { + break collectLoop + } + events = append(events, event) + case <-collectUntil: + break collectLoop + } + } + + seenEventIDs := make(map[string]struct{}, len(events)) + for _, event := range events { + _, duplicate := seenEventIDs[event.Id] + require.False(t, duplicate, "trigger event ID %s was delivered more than once", event.Id) + seenEventIDs[event.Id] = struct{}{} + } + + require.Nil(t, ts.UnregisterTrigger(t.Context(), testTriggerID, metadata, &crontypedapi.Config{Schedule: everySecond})) + <-collectDone +} + func TestEnforceFastestSchedule_NonUniformSecondsField(t *testing.T) { t.Parallel() // a schedule that has a bunch of 5s gaps followed by a bunch of 1s gaps