Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/handlers/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func isValidSchedule(schedule *models.ScheduleConfig) bool {

// Validate frequency
switch schedule.Frequency {
case "daily", "weekly", "monthly", "yearly":
case "minutely", "hourly", "daily", "weekly", "monthly", "yearly":
// Valid frequency
default:
return false
Expand Down
34 changes: 34 additions & 0 deletions internal/handlers/schedule_validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package handlers

import (
"testing"

"github.com/feedloop/qhronos/internal/models"
"github.com/stretchr/testify/assert"
)

func TestIsValidSchedule(t *testing.T) {
valid := []*models.ScheduleConfig{
{Frequency: "minutely", Interval: 1},
{Frequency: "hourly", Interval: 1},
{Frequency: "daily", Interval: 1},
{Frequency: "weekly", Interval: 1, ByDay: []string{"MO", "WE"}},
{Frequency: "monthly", Interval: 1, ByMonthDay: []int{1, 15}},
{Frequency: "yearly", Interval: 1, ByMonth: []int{1, 12}},
}
for _, sched := range valid {
assert.True(t, isValidSchedule(sched), "should be valid: %+v", sched)
}

invalid := []*models.ScheduleConfig{
{Frequency: "secondly", Interval: 1},
{Frequency: "foo", Interval: 1},
{Frequency: "minutely", Interval: 0},
{Frequency: "weekly", Interval: 1, ByDay: []string{"XX"}},
{Frequency: "monthly", Interval: 1, ByMonthDay: []int{0, 32}},
{Frequency: "yearly", Interval: 1, ByMonth: []int{0, 13}},
}
for _, sched := range invalid {
assert.False(t, isValidSchedule(sched), "should be invalid: %+v", sched)
}
}
12 changes: 12 additions & 0 deletions internal/repository/occurrence_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,15 @@ func (r *OccurrenceRepository) GetLatestByOccurrenceID(ctx context.Context, occu
}
return &occurrence, nil
}

// CountCompletedByEventID returns the number of completed occurrences for a given event ID
func (r *OccurrenceRepository) CountCompletedByEventID(ctx context.Context, eventID uuid.UUID) (int, error) {
var count int
query := `SELECT COUNT(*) FROM occurrences WHERE event_id = $1 AND status = 'completed'`
err := r.db.GetContext(ctx, &count, query, eventID)
if err != nil {
r.logger.Error("Error counting completed occurrences", zap.Error(err), zap.String("event_id", eventID.String()))
return 0, err
}
return count, nil
}
25 changes: 25 additions & 0 deletions internal/scheduler/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,31 @@ func (d *Dispatcher) DispatchAction(ctx context.Context, sched *models.Schedule)
}
}

// Auto-inactivate recurring events if count or until is reached
if event.Schedule != nil && event.Status == models.EventStatusActive && finalStatus == models.OccurrenceStatusCompleted {
shouldInactivate := false
if event.Schedule.Count != nil {
completedCount, err := d.occurrenceRepo.CountCompletedByEventID(ctx, event.ID)
if err == nil && completedCount >= *event.Schedule.Count {
shouldInactivate = true
}
}
if !shouldInactivate && event.Schedule.Until != nil {
untilTime, err := time.Parse(time.RFC3339, *event.Schedule.Until)
if err == nil && time.Now().After(untilTime) {
shouldInactivate = true
}
}
if shouldInactivate {
event.Status = models.EventStatusInactive
if err := d.eventRepo.Update(ctx, event); err != nil {
d.logger.Error("Failed to auto-inactivate recurring event", zap.String("event_id", event.ID.String()), zap.Error(err))
} else {
d.logger.Info("Auto-inactivated recurring event", zap.String("event_id", event.ID.String()))
}
}
}

return dispatchError // This error determines if it goes to retry queue
}

Expand Down
46 changes: 46 additions & 0 deletions internal/scheduler/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,52 @@ func TestDispatcher(t *testing.T) {
}
assert.Equal(t, []string{"client3:c1", "client3:c2", "client3:c1", "client3:c2"}, mockNotifier.calls)
})

t.Run("auto-inactivate recurring event when count is reached", func(t *testing.T) {
cleanup()
mockHTTP := new(MockHTTPClient)
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP)
mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})),
}, nil)
whParams, _ := json.Marshal(models.WebhookActionParams{URL: "http://example.com/webhook"})
count := 2
event := &models.Event{
ID: uuid.New(),
Name: "Recurring Event",
Description: "Test recurring event with count limit",
StartTime: time.Now(),
Action: &models.Action{Type: models.ActionTypeWebhook, Params: whParams},
Status: models.EventStatusActive,
Metadata: []byte(`{"key": "value"}`),
Tags: pq.StringArray{"test"},
CreatedAt: time.Now(),
Schedule: &models.ScheduleConfig{Frequency: "daily", Interval: 1, Count: &count},
}
err := eventRepo.Create(ctx, event)
require.NoError(t, err)
for i := 0; i < 2; i++ {
schedule := &models.Schedule{
Occurrence: models.Occurrence{
OccurrenceID: uuid.New(),
EventID: event.ID,
ScheduledAt: time.Now().Add(time.Duration(i) * time.Hour),
},
Name: event.Name,
Description: event.Description,
Webhook: "http://example.com/webhook",
Metadata: event.Metadata,
Tags: event.Tags,
}
err = dispatcher.DispatchAction(ctx, schedule)
assert.NoError(t, err)
}
// After two completions, event should be inactive
updatedEvent, err := eventRepo.GetByID(ctx, event.ID)
require.NoError(t, err)
assert.Equal(t, models.EventStatusInactive, updatedEvent.Status)
})
}

func TestDispatcher_RedisOnlyDispatch(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions internal/scheduler/expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ func (e *Expander) expandRecurringEvent(ctx context.Context, event *models.Event
e.logger.Debug("Found future occurrences for event", zap.String("event_id", event.ID.String()), zap.Int("count", len(occurrences)))

for _, t := range occurrences {
// Prevent duplicate occurrences: check if one already exists for this event and time
exists, err := e.occurrenceRepo.ExistsAtTime(ctx, event.ID, t)
if err != nil {
e.logger.Error("Failed to check for existing occurrence", zap.Error(err), zap.String("event_id", event.ID.String()), zap.Time("scheduled_at", t))
continue
}
if exists {
e.logger.Debug("Occurrence already exists, skipping", zap.String("event_id", event.ID.String()), zap.Time("scheduled_at", t))
continue
}
occurrence := &models.Occurrence{
OccurrenceID: uuid.New(),
EventID: event.ID,
Expand Down
40 changes: 40 additions & 0 deletions internal/scheduler/expander_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,44 @@ func TestEventExpander(t *testing.T) {
require.NoError(t, err)
assert.NotEmpty(t, results)
})

t.Run("minutely frequency event expansion is idempotent", func(t *testing.T) {
cleanup()
startTime := time.Now().Add(-2 * time.Minute).Truncate(time.Minute)
// Create a minutely schedule
scheduleConfig := &models.ScheduleConfig{
Frequency: "minutely",
Interval: 1,
}
event := &models.Event{
ID: uuid.New(),
Name: "Minutely Event",
Description: "Test minutely event for idempotency",
StartTime: startTime,
Webhook: "http://example.com",
Schedule: scheduleConfig,
Status: models.EventStatusActive,
Metadata: []byte(`{"key": "value"}`),
Tags: pq.StringArray{"test"},
CreatedAt: time.Now(),
}
err := eventRepo.Create(ctx, event)
require.NoError(t, err)
// Run expansion twice
err = expander.ExpandEvents(ctx)
require.NoError(t, err)
err = expander.ExpandEvents(ctx)
require.NoError(t, err)
// Get all occurrences for this event from the DB
occurrences, err := occurrenceRepo.ListByEventID(ctx, event.ID)
require.NoError(t, err)
// There should be only one occurrence per scheduled time
scheduledTimes := make(map[int64]struct{})
for _, occ := range occurrences {
ts := occ.ScheduledAt.Unix()
_, exists := scheduledTimes[ts]
assert.False(t, exists, "Duplicate occurrence for scheduled time: %v", occ.ScheduledAt)
scheduledTimes[ts] = struct{}{}
}
})
}
Loading