Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions cron/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,28 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat
scheduledExecutionTimeUTC := trigger.nextRun.UTC()
currentTimeUTC := s.clock.Now().UTC()

nextExecutionTime, nextRunErr := job.NextRun()
if nextRunErr != nil {
// .NextRun() will error if the job no longer exists
// or if there is no next run to schedule, which shouldn't happen with cron jobs
s.lggr.Errorw("task callback failed to schedule next run", "triggerID", triggerID)
}

// Advance nextRun before blocking I/O so overlapping task invocations
// cannot reuse the same scheduled execution time.
muCh.Lock()
if callbackCh == nil {
muCh.Unlock()
return // unregistered already
}
s.triggers.Write(triggerID, cronTrigger{
job: job,
nextRun: nextExecutionTime,
workflowID: trigger.workflowID,
close: closeCh,
})
muCh.Unlock()

response := createTriggerResponse(scheduledExecutionTimeUTC)

displayWorkflowName := metadata.DecodedWorkflowName
Expand Down Expand Up @@ -309,24 +331,11 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat

s.lggr.Debugw("task callback sending trigger response", "executionID", workflowExecutionID, "isLegacyExecutionID", isLegacyExecutionID, "triggerID", triggerID, "scheduledExecTimeUTC", scheduledExecutionTimeUTC.Format(time.RFC3339Nano), "actualExecTimeUTC", currentTimeUTC.Format(time.RFC3339Nano))

nextExecutionTime, nextRunErr := job.NextRun()
if nextRunErr != nil {
// .NextRun() will error if the job no longer exists
// or if there is no next run to schedule, which shouldn't happen with cron jobs
s.lggr.Errorw("task callback failed to schedule next run", "executionID", workflowExecutionID, "triggerID", triggerID)
}

muCh.RLock()
defer muCh.RUnlock()
if callbackCh == nil {
return // unregistered already
}
s.triggers.Write(triggerID, cronTrigger{
job: job,
nextRun: nextExecutionTime,
workflowID: metadata.WorkflowID,
close: closeCh,
})

select {
case callbackCh <- response:
Expand All @@ -349,7 +358,12 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat
}

// If service has already started, job will be scheduled immediately
job, err = s.scheduler.NewJob(jobDef, task, gocron.WithName(triggerID))
job, err = s.scheduler.NewJob(
jobDef,
task,
gocron.WithName(triggerID),
gocron.WithSingletonMode(gocron.LimitModeWait),
)
if err != nil {
s.lggr.Errorw("failed to create new job", "err", err)
return nil, caperrors.NewPublicSystemError(fmt.Errorf("RegisterTrigger failed to create new job: %s", err), caperrors.Internal)
Expand Down
121 changes: 121 additions & 0 deletions cron/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"math"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1258,6 +1259,126 @@
})
}

// blockingOrgResolver blocks the first Get call until release is closed, simulating
// slow org-resolver I/O inside the cron task callback before nextRun is advanced.
type blockingOrgResolver struct {
firstBlocked chan struct{}
release chan struct{}
getCount atomic.Int32
once sync.Once
}

func (r *blockingOrgResolver) Get(ctx context.Context, owner string) (string, error) {
if r.getCount.Add(1) == 1 {
r.once.Do(func() { close(r.firstBlocked) })
select {
case <-r.release:
case <-ctx.Done():
return "", ctx.Err()
}
}
return "test-org-id", nil
}

func (r *blockingOrgResolver) Start(ctx context.Context) error { return nil }

Check failure on line 1283 in cron/trigger/trigger_test.go

View workflow job for this annotation

GitHub Actions / lint (cron)

File is not properly formatted (goimports)
func (r *blockingOrgResolver) Close() error { return nil }
func (r *blockingOrgResolver) HealthReport() map[string]error { return map[string]error{} }
func (r *blockingOrgResolver) Ready() error { return nil }
func (r *blockingOrgResolver) Name() string { return "BlockingOrgResolver" }

var _ orgresolver.OrgResolver = (*blockingOrgResolver)(nil)

// TestCronTrigger_DelayedDuplicateEventWhenCallbackBlocks ensures a cron trigger does not
// deliver duplicate trigger event IDs when the task callback blocks (e.g. on org resolver I/O).
func TestCronTrigger_DelayedDuplicateEventWhenCallbackBlocks(t *testing.T) {
const testWorkflowID = "00c0de5771c4f20ed242bedd2de9d3fdf9a0fbe5d03aefca9bdb29013a28de36"
const testTriggerID = testWorkflowID + "|cron-0"

releaseFirst := make(chan struct{})
orgResolver := &blockingOrgResolver{
firstBlocked: make(chan struct{}),
release: releaseFirst,
}

triggerConfig, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1})
require.NoError(t, err)

ts, err := NewTriggerService(logger.Nop(), clockwork.NewRealClock(), limits.Factory{})
require.NoError(t, err)
err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{
Config: string(triggerConfig),
OrgResolver: orgResolver,
})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, ts.Close()) })

metadata := capabilities.RequestMetadata{
WorkflowID: testWorkflowID,
WorkflowOwner: "0x0000000000000000000000000000000000000001",
ReferenceID: workflows.GetTriggerReferenceID(0),
}
callback, capErr := ts.RegisterTrigger(t.Context(), testTriggerID, metadata, &crontypedapi.Config{Schedule: everySecond})
require.Nil(t, capErr)

eventsCh := make(chan capabilities.TriggerAndId[*crontypedapi.Payload], 4)
collectDone := make(chan struct{})
go func() {
defer close(collectDone)
for event := range callback {
eventsCh <- event
}
close(eventsCh)
}()

select {
case <-orgResolver.firstBlocked:
case <-time.After(3 * time.Second):
t.Fatal("timed out waiting for first cron task to block in org resolver")
}

// While the first task is still blocked, no trigger event should be delivered.
select {
case event := <-eventsCh:
t.Fatalf("unexpected trigger event while callback is blocked: %s", event.Id)
case <-time.After(1 * time.Second):
}

close(releaseFirst)

var events []capabilities.TriggerAndId[*crontypedapi.Payload]
select {
case event := <-eventsCh:
events = append(events, event)
case <-time.After(3 * time.Second):
t.Fatal("timed out waiting for trigger event after blocked callback completes")
}

// Fixed window: keep reading so duplicate IDs cannot slip through after the first event.
collectUntil := time.After(2 * time.Second)
collectLoop:
for {
select {
case event, ok := <-eventsCh:
if !ok {
break collectLoop
}
events = append(events, event)
case <-collectUntil:
break collectLoop
}
}

seenEventIDs := make(map[string]struct{}, len(events))
for _, event := range events {
_, duplicate := seenEventIDs[event.Id]
require.False(t, duplicate, "trigger event ID %s was delivered more than once", event.Id)
seenEventIDs[event.Id] = struct{}{}
}

require.Nil(t, ts.UnregisterTrigger(t.Context(), testTriggerID, metadata, &crontypedapi.Config{Schedule: everySecond}))
<-collectDone
}

func TestEnforceFastestSchedule_NonUniformSecondsField(t *testing.T) {
t.Parallel()
// a schedule that has a bunch of 5s gaps followed by a bunch of 1s gaps
Expand Down
Loading