From 475ee1aa9697bf4b0b510093fcd761febdde1741 Mon Sep 17 00:00:00 2001 From: Ahmad Rizqi Meydiarso Date: Mon, 12 May 2025 14:40:28 +0800 Subject: [PATCH] fix(expander): make expander idempotent and prevent duplicate occurrences for recurring events - Check for existing occurrence before scheduling. Add test to verify no duplicates for minutely schedules. Ensures safe repeated expansion runs. --- internal/handlers/event_handler.go | 2 +- internal/handlers/schedule_validation_test.go | 34 ++++++++++++++ internal/repository/occurrence_repository.go | 12 +++++ internal/scheduler/dispatcher.go | 25 ++++++++++ internal/scheduler/dispatcher_test.go | 46 +++++++++++++++++++ internal/scheduler/expander.go | 10 ++++ internal/scheduler/expander_test.go | 40 ++++++++++++++++ 7 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 internal/handlers/schedule_validation_test.go diff --git a/internal/handlers/event_handler.go b/internal/handlers/event_handler.go index e3dd011..f3ea5bf 100644 --- a/internal/handlers/event_handler.go +++ b/internal/handlers/event_handler.go @@ -358,7 +358,7 @@ func isValidSchedule(schedule *models.ScheduleConfig) bool { // Validate frequency switch schedule.Frequency { - case "daily", "weekly", "monthly", "yearly": + case "minutely", "hourly", "daily", "weekly", "monthly", "yearly": // Valid frequency default: return false diff --git a/internal/handlers/schedule_validation_test.go b/internal/handlers/schedule_validation_test.go new file mode 100644 index 0000000..39f1c72 --- /dev/null +++ b/internal/handlers/schedule_validation_test.go @@ -0,0 +1,34 @@ +package handlers + +import ( + "testing" + + "github.com/feedloop/qhronos/internal/models" + "github.com/stretchr/testify/assert" +) + +func TestIsValidSchedule(t *testing.T) { + valid := []*models.ScheduleConfig{ + {Frequency: "minutely", Interval: 1}, + {Frequency: "hourly", Interval: 1}, + {Frequency: "daily", Interval: 1}, + {Frequency: "weekly", Interval: 1, ByDay: []string{"MO", "WE"}}, + {Frequency: "monthly", Interval: 1, ByMonthDay: []int{1, 15}}, + {Frequency: "yearly", Interval: 1, ByMonth: []int{1, 12}}, + } + for _, sched := range valid { + assert.True(t, isValidSchedule(sched), "should be valid: %+v", sched) + } + + invalid := []*models.ScheduleConfig{ + {Frequency: "secondly", Interval: 1}, + {Frequency: "foo", Interval: 1}, + {Frequency: "minutely", Interval: 0}, + {Frequency: "weekly", Interval: 1, ByDay: []string{"XX"}}, + {Frequency: "monthly", Interval: 1, ByMonthDay: []int{0, 32}}, + {Frequency: "yearly", Interval: 1, ByMonth: []int{0, 13}}, + } + for _, sched := range invalid { + assert.False(t, isValidSchedule(sched), "should be invalid: %+v", sched) + } +} diff --git a/internal/repository/occurrence_repository.go b/internal/repository/occurrence_repository.go index 5be6090..8989076 100644 --- a/internal/repository/occurrence_repository.go +++ b/internal/repository/occurrence_repository.go @@ -204,3 +204,15 @@ func (r *OccurrenceRepository) GetLatestByOccurrenceID(ctx context.Context, occu } return &occurrence, nil } + +// CountCompletedByEventID returns the number of completed occurrences for a given event ID +func (r *OccurrenceRepository) CountCompletedByEventID(ctx context.Context, eventID uuid.UUID) (int, error) { + var count int + query := `SELECT COUNT(*) FROM occurrences WHERE event_id = $1 AND status = 'completed'` + err := r.db.GetContext(ctx, &count, query, eventID) + if err != nil { + r.logger.Error("Error counting completed occurrences", zap.Error(err), zap.String("event_id", eventID.String())) + return 0, err + } + return count, nil +} diff --git a/internal/scheduler/dispatcher.go b/internal/scheduler/dispatcher.go index a7cc2f5..acc4786 100644 --- a/internal/scheduler/dispatcher.go +++ b/internal/scheduler/dispatcher.go @@ -164,6 +164,31 @@ func (d *Dispatcher) DispatchAction(ctx context.Context, sched *models.Schedule) } } + // Auto-inactivate recurring events if count or until is reached + if event.Schedule != nil && event.Status == models.EventStatusActive && finalStatus == models.OccurrenceStatusCompleted { + shouldInactivate := false + if event.Schedule.Count != nil { + completedCount, err := d.occurrenceRepo.CountCompletedByEventID(ctx, event.ID) + if err == nil && completedCount >= *event.Schedule.Count { + shouldInactivate = true + } + } + if !shouldInactivate && event.Schedule.Until != nil { + untilTime, err := time.Parse(time.RFC3339, *event.Schedule.Until) + if err == nil && time.Now().After(untilTime) { + shouldInactivate = true + } + } + if shouldInactivate { + event.Status = models.EventStatusInactive + if err := d.eventRepo.Update(ctx, event); err != nil { + d.logger.Error("Failed to auto-inactivate recurring event", zap.String("event_id", event.ID.String()), zap.Error(err)) + } else { + d.logger.Info("Auto-inactivated recurring event", zap.String("event_id", event.ID.String())) + } + } + } + return dispatchError // This error determines if it goes to retry queue } diff --git a/internal/scheduler/dispatcher_test.go b/internal/scheduler/dispatcher_test.go index 7e37b06..33b87f2 100644 --- a/internal/scheduler/dispatcher_test.go +++ b/internal/scheduler/dispatcher_test.go @@ -327,6 +327,52 @@ func TestDispatcher(t *testing.T) { } assert.Equal(t, []string{"client3:c1", "client3:c2", "client3:c1", "client3:c2"}, mockNotifier.calls) }) + + t.Run("auto-inactivate recurring event when count is reached", func(t *testing.T) { + cleanup() + mockHTTP := new(MockHTTPClient) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})), + }, nil) + whParams, _ := json.Marshal(models.WebhookActionParams{URL: "http://example.com/webhook"}) + count := 2 + event := &models.Event{ + ID: uuid.New(), + Name: "Recurring Event", + Description: "Test recurring event with count limit", + StartTime: time.Now(), + Action: &models.Action{Type: models.ActionTypeWebhook, Params: whParams}, + Status: models.EventStatusActive, + Metadata: []byte(`{"key": "value"}`), + Tags: pq.StringArray{"test"}, + CreatedAt: time.Now(), + Schedule: &models.ScheduleConfig{Frequency: "daily", Interval: 1, Count: &count}, + } + err := eventRepo.Create(ctx, event) + require.NoError(t, err) + for i := 0; i < 2; i++ { + schedule := &models.Schedule{ + Occurrence: models.Occurrence{ + OccurrenceID: uuid.New(), + EventID: event.ID, + ScheduledAt: time.Now().Add(time.Duration(i) * time.Hour), + }, + Name: event.Name, + Description: event.Description, + Webhook: "http://example.com/webhook", + Metadata: event.Metadata, + Tags: event.Tags, + } + err = dispatcher.DispatchAction(ctx, schedule) + assert.NoError(t, err) + } + // After two completions, event should be inactive + updatedEvent, err := eventRepo.GetByID(ctx, event.ID) + require.NoError(t, err) + assert.Equal(t, models.EventStatusInactive, updatedEvent.Status) + }) } func TestDispatcher_RedisOnlyDispatch(t *testing.T) { diff --git a/internal/scheduler/expander.go b/internal/scheduler/expander.go index 2f0156a..4cc8377 100644 --- a/internal/scheduler/expander.go +++ b/internal/scheduler/expander.go @@ -235,6 +235,16 @@ func (e *Expander) expandRecurringEvent(ctx context.Context, event *models.Event e.logger.Debug("Found future occurrences for event", zap.String("event_id", event.ID.String()), zap.Int("count", len(occurrences))) for _, t := range occurrences { + // Prevent duplicate occurrences: check if one already exists for this event and time + exists, err := e.occurrenceRepo.ExistsAtTime(ctx, event.ID, t) + if err != nil { + e.logger.Error("Failed to check for existing occurrence", zap.Error(err), zap.String("event_id", event.ID.String()), zap.Time("scheduled_at", t)) + continue + } + if exists { + e.logger.Debug("Occurrence already exists, skipping", zap.String("event_id", event.ID.String()), zap.Time("scheduled_at", t)) + continue + } occurrence := &models.Occurrence{ OccurrenceID: uuid.New(), EventID: event.ID, diff --git a/internal/scheduler/expander_test.go b/internal/scheduler/expander_test.go index f665f49..30b031a 100644 --- a/internal/scheduler/expander_test.go +++ b/internal/scheduler/expander_test.go @@ -384,4 +384,44 @@ func TestEventExpander(t *testing.T) { require.NoError(t, err) assert.NotEmpty(t, results) }) + + t.Run("minutely frequency event expansion is idempotent", func(t *testing.T) { + cleanup() + startTime := time.Now().Add(-2 * time.Minute).Truncate(time.Minute) + // Create a minutely schedule + scheduleConfig := &models.ScheduleConfig{ + Frequency: "minutely", + Interval: 1, + } + event := &models.Event{ + ID: uuid.New(), + Name: "Minutely Event", + Description: "Test minutely event for idempotency", + StartTime: startTime, + Webhook: "http://example.com", + Schedule: scheduleConfig, + Status: models.EventStatusActive, + Metadata: []byte(`{"key": "value"}`), + Tags: pq.StringArray{"test"}, + CreatedAt: time.Now(), + } + err := eventRepo.Create(ctx, event) + require.NoError(t, err) + // Run expansion twice + err = expander.ExpandEvents(ctx) + require.NoError(t, err) + err = expander.ExpandEvents(ctx) + require.NoError(t, err) + // Get all occurrences for this event from the DB + occurrences, err := occurrenceRepo.ListByEventID(ctx, event.ID) + require.NoError(t, err) + // There should be only one occurrence per scheduled time + scheduledTimes := make(map[int64]struct{}) + for _, occ := range occurrences { + ts := occ.ScheduledAt.Unix() + _, exists := scheduledTimes[ts] + assert.False(t, exists, "Duplicate occurrence for scheduled time: %v", occ.ScheduledAt) + scheduledTimes[ts] = struct{}{} + } + }) }